Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add load from pdf component #765

Merged
merged 4 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions components/load_from_pdf/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ COPY src/ src/

FROM base as test
COPY tests/ tests/
COPY test_file/ test_file/
COPY test_folder/ test_folder/
RUN pip3 install --no-cache-dir -r tests/requirements.txt
RUN python -m pytest tests

Expand Down
1 change: 1 addition & 0 deletions components/load_from_pdf/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Load pdf data stored locally or remote using langchain loaders.
### Produces
**This component produces:**

- pdf_path: string
- file_name: string
- text: string

Expand Down
2 changes: 2 additions & 0 deletions components/load_from_pdf/fondant_component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ tags:
- Data loading

produces:
pdf_path:
type: string
file_name:
type: string
text:
Expand Down
5 changes: 1 addition & 4 deletions components/load_from_pdf/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1 @@
langchain==0.0.353
langchain-community==0.0.7
pypdf==3.17.4
tqdm==4.65.0
PyMuPDF==1.23.8
83 changes: 43 additions & 40 deletions components/load_from_pdf/src/main.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
import logging
import os
import tempfile
import typing as t
from collections import defaultdict

import dask.dataframe as dd
import fitz
import fsspec as fs
import pandas as pd
from fondant.component import DaskLoadComponent
from fondant.core.component_spec import OperationSpec
from langchain_community.document_loaders import PyPDFDirectoryLoader
from tqdm import tqdm

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -73,47 +70,53 @@ def _get_meta_df() -> pd.DataFrame:

return dask_df

def load(self) -> dd.DataFrame:
if self.protocol == "file":
logger.info("Found PDF files local file system")

if self.fs.exists(self.pdf_path):
if self.fs.isdir(self.pdf_path):
pdf_dir = self.pdf_path
else:
pdf_dir = os.path.dirname(self.pdf_path)

loader = PyPDFDirectoryLoader(pdf_dir)
documents = loader.load()
def load_pdf_from_fs(self, file_path: str):
with self.fs.open(file_path, "rb") as pdf_file:
pdf_bytes = pdf_file.read()

else:
msg = "PDF path does not exist"
raise ValueError(msg)

else:
logger.info("Found PDF files on remote file system")
documents = fitz.open("pdf", pdf_bytes)
# get all text
text = "".join([document.get_text() for document in documents])
documents.close()

files = self.fs.ls(self.pdf_path)
return text

with tempfile.TemporaryDirectory() as temp_dir:
for file_path in tqdm(files):
if file_path.endswith(".pdf"):
file_name = os.path.basename(file_path)
temp_file_path = os.path.join(temp_dir, file_name)
self.fs.get(file_path, temp_file_path)
def process_pdf(self, row):
file_path = row["pdf_path"]
text = self.load_pdf_from_fs(file_path)
row["file_name"] = file_path.split("/")[-1] # Extracting filename
row["text"] = text
return row

loader = PyPDFDirectoryLoader(temp_dir)
documents = loader.lazy_load()

doc_dict = defaultdict(list)
for doc_counter, document in enumerate(documents):
doc_dict["file_name"].append(os.path.basename(document.metadata["source"]))
doc_dict["text"].append(document.page_content)

if doc_counter == self.n_rows_to_load:
break
def load(self) -> dd.DataFrame:
try:
file_paths = self.fs.ls(self.pdf_path)
except NotADirectoryError:
file_paths = [self.pdf_path]

file_paths = [
file_path for file_path in file_paths if file_path.endswith(".pdf")
]

if self.n_rows_to_load is not None:
file_paths = file_paths[: self.n_rows_to_load]

dask_df = dd.from_pandas(
pd.DataFrame({"pdf_path": file_paths}),
npartitions=os.cpu_count(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could probably be a parameter so it can be upped for larger datasets.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added :)

)

meta_dict = {}
for field_name, field in self.spec.inner_produces.items():
meta_dict[field_name] = pd.Series(
dtype=pd.ArrowDtype(field.type.value),
)
meta_dict = pd.DataFrame(meta_dict)

dask_df = dd.from_dict(doc_dict, npartitions=1)
dask_df = dask_df.map_partitions(
lambda part: part.apply(self.process_pdf, axis=1),
meta=meta_dict,
)

dask_df = self.set_df_index(dask_df)
return dask_df
16 changes: 8 additions & 8 deletions components/load_from_pdf/tests/component_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def test_pdf_reader():
spec = ComponentSpec(yaml.safe_load(f))
spec = OperationSpec(spec)

pdf_path = ["test_file/dummy.pdf", "test_folder"]
pdf_path = ["tests/test_file/dummy.pdf", "tests/test_folder"]

for path in pdf_path:
component = PDFReader(
Expand All @@ -29,19 +29,19 @@ def test_pdf_reader():

output_dataframe = component.load().compute()

assert output_dataframe.columns.tolist() == ["file_name", "text"]
assert output_dataframe.columns.tolist() == ["pdf_path", "file_name", "text"]

if path == "test_file/dummy.pdf":
assert output_dataframe.shape == (1, 2)
if path == "tests/test_file/dummy.pdf":
assert output_dataframe.shape == (1, 3)
assert output_dataframe["file_name"].tolist() == ["dummy.pdf"]
assert output_dataframe["text"].tolist() == ["Dumm y PDF file"]
assert output_dataframe["text"].tolist() == ["Dummy PDF file\n"]
else:
assert output_dataframe.shape == (2, 2)
assert output_dataframe.shape == (2, 3)
assert output_dataframe["file_name"].tolist() == [
"dummy_2.pdf",
"dummy_1.pdf",
]
assert output_dataframe["text"].tolist() == [
"Dumm y PDF file",
"Dumm y PDF file",
"Dummy PDF file\n",
"Dummy PDF file\n",
]
11 changes: 2 additions & 9 deletions components/load_from_pdf/tests/fondant_component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ tags:
- Data loading

produces:
pdf_path:
type: string
file_name:
type: string
text:
Expand All @@ -18,15 +20,6 @@ args:
Can be a local path or a remote path. If the path is remote, the loader class will be
determined by the scheme of the path.
type: str
loader_kwargs:
description: |
Keyword arguments to pass when instantiating the loader class. Check the documentation of
the loader to check which arguments it accepts.
GCS: https://python.langchain.com/docs/integrations/document_loaders/google_cloud_storage_file
S3: https://python.langchain.com/docs/integrations/document_loaders/aws_s3_file
Azure: https://python.langchain.com/docs/integrations/document_loaders/azure_blob_storage_file
type: dict
default: {}
n_rows_to_load:
description: |
Optional argument that defines the number of rows to load. Useful for testing pipeline runs
Expand Down
Loading