Skip to content

Commit

Permalink
Add load from pdf component (#765)
Browse files Browse the repository at this point in the history
Fixes ml6team/fondant-internal#54

PR that adds the functionality to load pdf documents from different
local and remote storage.

The implementation differs from the suggested solution at
[#54](ml6team/fondant-internal#54) since:
* Accumulating different loaders and loading each document individually
seems to be inefficient since it would require the initialization of a
client, temp storage, ... on every invocation
[link](https://github.com/langchain-ai/langchain/blob/04caf07dee2e2843ab720e5b8f0c0e83d0b86a3e/libs/community/langchain_community/document_loaders/gcs_file.py#L62)
* The langchain cloud loaders don't have a unified interface
* Each would requires specific arguments to be passed (in contrast
fsspec is much simpler)
* Only the google loader enables defining a custom loader class, the
rest uses the `Unstructured` loader which requires a lot of system and
cuda dependencies to have it installed (a lot of overhead for just
loading pdfs)

The current implementation relies on copying the pdfs to a temporary
local storage and loading them using the `PyPDFDirectoryLoader`, they
are then loaded lazily. The assumption for now is that the loaded docs
won't exceed the storage of the device which should be valid for most
use cases. Later on, we can think on how to optimize this further.
  • Loading branch information
PhilippeMoussalli authored Jan 11, 2024
1 parent e677157 commit b422fc3
Show file tree
Hide file tree
Showing 12 changed files with 352 additions and 0 deletions.
30 changes: 30 additions & 0 deletions components/load_from_pdf/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
FROM --platform=linux/amd64 python:3.8-slim as base

# System dependencies
RUN apt-get update && \
apt-get upgrade -y && \
apt-get install git -y

# Install requirements
COPY requirements.txt /
RUN pip3 install --no-cache-dir -r requirements.txt

# Install Fondant
# This is split from other requirements to leverage caching
ARG FONDANT_VERSION=main
RUN pip3 install fondant[component,aws,azure,gcp]@git+https://github.com/ml6team/fondant@${FONDANT_VERSION}

# Set the working directory to the component folder
WORKDIR /component
COPY src/ src/

FROM base as test
COPY tests/ tests/
RUN pip3 install --no-cache-dir -r tests/requirements.txt
RUN python -m pytest tests

FROM base
COPY tests/ tests/
WORKDIR /component/src
ENTRYPOINT ["fondant", "execute", "main"]

69 changes: 69 additions & 0 deletions components/load_from_pdf/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Load from pdf

<a id="load_from_pdf#description"></a>
## Description
Load pdf data stored locally or remote using langchain loaders.


<a id="load_from_pdf#inputs_outputs"></a>
## Inputs / outputs

<a id="load_from_pdf#consumes"></a>
### Consumes


**This component does not consume data.**


<a id="load_from_pdf#produces"></a>
### Produces
**This component produces:**

- pdf_path: string
- file_name: string
- text: string



<a id="load_from_pdf#arguments"></a>
## Arguments

The component takes the following arguments to alter its behavior:

| argument | type | description | default |
| -------- | ---- | ----------- | ------- |
| pdf_path | str | The path to the a pdf file or a folder containing pdf files to load. Can be a local path or a remote path. If the path is remote, the loader class will be determined by the scheme of the path. | / |
| n_rows_to_load | int | Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale | / |
| index_column | str | Column to set index to in the load component, if not specified a default globally unique index will be set | / |
| n_partitions | int | Number of partitions of the dask dataframe. If not specified, the number of partitions will be equal to the number of CPU cores. Set to high values if the data is large and the pipelineis running out of memory. | / |

<a id="load_from_pdf#usage"></a>
## Usage

You can add this component to your pipeline using the following code:

```python
from fondant.pipeline import Pipeline


pipeline = Pipeline(...)

dataset = pipeline.read(
"load_from_pdf",
arguments={
# Add arguments
# "pdf_path": ,
# "n_rows_to_load": 0,
# "index_column": ,
# "n_partitions": 0,
},
)
```

<a id="load_from_pdf#testing"></a>
## Testing

You can run the tests using docker with BuildKit. From this directory, run:
```
docker build . --target test
```
41 changes: 41 additions & 0 deletions components/load_from_pdf/fondant_component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
name: Load from pdf
description: |
Load pdf data stored locally or remote using langchain loaders.
image: fndnt/load_from_pdf:dev
tags:
- Data loading

produces:
pdf_path:
type: string
file_name:
type: string
text:
type: string

args:
pdf_path:
description: |
The path to the a pdf file or a folder containing pdf files to load.
Can be a local path or a remote path. If the path is remote, the loader class will be
determined by the scheme of the path.
type: str
n_rows_to_load:
description: |
Optional argument that defines the number of rows to load. Useful for testing pipeline runs
on a small scale
type: int
default: None
index_column:
description: |
Column to set index to in the load component, if not specified a default globally unique
index will be set
type: str
default: None
n_partitions:
description: |
Number of partitions of the dask dataframe. If not specified, the number of partitions will
be equal to the number of CPU cores. Set to high values if the data is large and the pipeline
is running out of memory.
type: int
default: None
1 change: 1 addition & 0 deletions components/load_from_pdf/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
PyMuPDF==1.23.8
127 changes: 127 additions & 0 deletions components/load_from_pdf/src/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import logging
import os
import typing as t

import dask.dataframe as dd
import fitz
import fsspec as fs
import pandas as pd
from fondant.component import DaskLoadComponent
from fondant.core.component_spec import OperationSpec

logger = logging.getLogger(__name__)


class PDFReader(DaskLoadComponent):
def __init__(
self,
spec: OperationSpec,
*,
pdf_path: str,
n_rows_to_load: t.Optional[int] = None,
index_column: t.Optional[str] = None,
n_partitions: t.Optional[int] = None,
) -> None:
"""
Args:
spec: the operation spec for the component
pdf_path: Path to the PDF file
n_rows_to_load: optional argument that defines the number of rows to load.
Useful for testing pipeline runs on a small scale.
index_column: Column to set index to in the load component, if not specified a default
globally unique index will be set.
n_partitions: Number of partitions of the dask dataframe. If not specified, the number
of partitions will be equal to the number of CPU cores. Set to high values if
the data is large and the pipeline is running out of memory.
"""
self.spec = spec
self.pdf_path = pdf_path
self.n_rows_to_load = n_rows_to_load
self.index_column = index_column
self.protocol = fs.utils.get_protocol(self.pdf_path)
self.fs, _ = fs.core.url_to_fs(self.pdf_path)
self.n_partitions = n_partitions if n_partitions is not None else os.cpu_count()

def set_df_index(self, dask_df: dd.DataFrame) -> dd.DataFrame:
if self.index_column is None:
logger.info(
"Index column not specified, setting a globally unique index",
)

def _set_unique_index(dataframe: pd.DataFrame, partition_info=None):
"""Function that sets a unique index based on the partition and row number."""
dataframe["id"] = 1
dataframe["id"] = (
str(partition_info["number"])
+ "_"
+ (dataframe.id.cumsum()).astype(str)
)
dataframe.index = dataframe.pop("id")
return dataframe

def _get_meta_df() -> pd.DataFrame:
meta_dict = {"id": pd.Series(dtype="object")}
for field_name, field in self.spec.inner_produces.items():
meta_dict[field_name] = pd.Series(
dtype=pd.ArrowDtype(field.type.value),
)
return pd.DataFrame(meta_dict).set_index("id")

meta = _get_meta_df()
dask_df = dask_df.map_partitions(_set_unique_index, meta=meta)
else:
logger.info(f"Setting `{self.index_column}` as index")
dask_df = dask_df.set_index(self.index_column, drop=True)

return dask_df

def load_pdf_from_fs(self, file_path: str):
with self.fs.open(file_path, "rb") as pdf_file:
pdf_bytes = pdf_file.read()

documents = fitz.open("pdf", pdf_bytes)
# get all text
text = "".join([document.get_text() for document in documents])
documents.close()

return text

def process_pdf(self, row):
file_path = row["pdf_path"]
text = self.load_pdf_from_fs(file_path)
row["file_name"] = file_path.split("/")[-1] # Extracting filename
row["text"] = text
return row

def load(self) -> dd.DataFrame:
try:
file_paths = self.fs.ls(self.pdf_path)
except NotADirectoryError:
file_paths = [self.pdf_path]

file_paths = [
file_path for file_path in file_paths if file_path.endswith(".pdf")
]

if self.n_rows_to_load is not None:
file_paths = file_paths[: self.n_rows_to_load]

dask_df = dd.from_pandas(
pd.DataFrame({"pdf_path": file_paths}),
npartitions=self.n_partitions,
)

meta_dict = {}
for field_name, field in self.spec.inner_produces.items():
meta_dict[field_name] = pd.Series(
dtype=pd.ArrowDtype(field.type.value),
)
meta_dict = pd.DataFrame(meta_dict)

dask_df = dask_df.map_partitions(
lambda part: part.apply(self.process_pdf, axis=1),
meta=meta_dict,
)

dask_df = self.set_df_index(dask_df)
return dask_df
47 changes: 47 additions & 0 deletions components/load_from_pdf/tests/component_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from pathlib import Path

import yaml
from fondant.core.component_spec import ComponentSpec, OperationSpec

from src.main import PDFReader


def test_pdf_reader():
"""Test the component with the ArxivReader.
This test requires a stable internet connection, both to download the loader, and to download
the papers from Arxiv.
"""
with open(Path(__file__).with_name("fondant_component.yaml")) as f:
print(f.name)
spec = ComponentSpec(yaml.safe_load(f))
spec = OperationSpec(spec)

pdf_path = ["tests/test_file/dummy.pdf", "tests/test_folder"]

for path in pdf_path:
component = PDFReader(
spec=spec,
pdf_path=path,
n_rows_to_load=None,
index_column=None,
)

output_dataframe = component.load().compute()

assert output_dataframe.columns.tolist() == ["pdf_path", "file_name", "text"]

if path == "tests/test_file/dummy.pdf":
assert output_dataframe.shape == (1, 3)
assert output_dataframe["file_name"].tolist() == ["dummy.pdf"]
assert output_dataframe["text"].tolist() == ["Dummy PDF file\n"]
else:
assert output_dataframe.shape == (2, 3)
assert output_dataframe["file_name"].tolist() == [
"dummy_2.pdf",
"dummy_1.pdf",
]
assert output_dataframe["text"].tolist() == [
"Dummy PDF file\n",
"Dummy PDF file\n",
]
34 changes: 34 additions & 0 deletions components/load_from_pdf/tests/fondant_component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
name: Load from pdf
description: |
Load pdf data stored locally or remote using langchain loaders.
image: fndnt/load_from_pdf:dev
tags:
- Data loading

produces:
pdf_path:
type: string
file_name:
type: string
text:
type: string

args:
pdf_path:
description: |
The path to the a pdf file or a folder containing pdf files to load.
Can be a local path or a remote path. If the path is remote, the loader class will be
determined by the scheme of the path.
type: str
n_rows_to_load:
description: |
Optional argument that defines the number of rows to load. Useful for testing pipeline runs
on a small scale
type: int
default: None
index_column:
description: |
Column to set index to in the load component, if not specified a default globally unique
index will be set
type: str
default: None
2 changes: 2 additions & 0 deletions components/load_from_pdf/tests/pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[pytest]
pythonpath = ../src
1 change: 1 addition & 0 deletions components/load_from_pdf/tests/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pytest==7.4.2
Binary file not shown.
Binary file not shown.
Binary file not shown.

0 comments on commit b422fc3

Please sign in to comment.