Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add load from pdf component #765

Merged
merged 4 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions components/load_from_pdf/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
FROM --platform=linux/amd64 python:3.8-slim as base

# System dependencies
RUN apt-get update && \
apt-get upgrade -y && \
apt-get install git -y

# Install requirements
COPY requirements.txt /
RUN pip3 install --no-cache-dir -r requirements.txt

# Install Fondant
# This is split from other requirements to leverage caching
ARG FONDANT_VERSION=main
RUN pip3 install fondant[component,aws,azure,gcp]@git+https://github.com/ml6team/fondant@${FONDANT_VERSION}

# Set the working directory to the component folder
WORKDIR /component
COPY src/ src/

FROM base as test
COPY tests/ tests/
RUN pip3 install --no-cache-dir -r tests/requirements.txt
RUN python -m pytest tests

FROM base
COPY tests/ tests/
WORKDIR /component/src
ENTRYPOINT ["fondant", "execute", "main"]

69 changes: 69 additions & 0 deletions components/load_from_pdf/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Load from pdf

<a id="load_from_pdf#description"></a>
## Description
Load pdf data stored locally or remote using langchain loaders.


<a id="load_from_pdf#inputs_outputs"></a>
## Inputs / outputs

<a id="load_from_pdf#consumes"></a>
### Consumes


**This component does not consume data.**


<a id="load_from_pdf#produces"></a>
### Produces
**This component produces:**

- pdf_path: string
- file_name: string
- text: string



<a id="load_from_pdf#arguments"></a>
## Arguments

The component takes the following arguments to alter its behavior:

| argument | type | description | default |
| -------- | ---- | ----------- | ------- |
| pdf_path | str | The path to the a pdf file or a folder containing pdf files to load. Can be a local path or a remote path. If the path is remote, the loader class will be determined by the scheme of the path. | / |
| n_rows_to_load | int | Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale | / |
| index_column | str | Column to set index to in the load component, if not specified a default globally unique index will be set | / |
| n_partitions | int | Number of partitions of the dask dataframe. If not specified, the number of partitions will be equal to the number of CPU cores. Set to high values if the data is large and the pipelineis running out of memory. | / |

<a id="load_from_pdf#usage"></a>
## Usage

You can add this component to your pipeline using the following code:

```python
from fondant.pipeline import Pipeline


pipeline = Pipeline(...)

dataset = pipeline.read(
"load_from_pdf",
arguments={
# Add arguments
# "pdf_path": ,
# "n_rows_to_load": 0,
# "index_column": ,
# "n_partitions": 0,
},
)
```

<a id="load_from_pdf#testing"></a>
## Testing

You can run the tests using docker with BuildKit. From this directory, run:
```
docker build . --target test
```
41 changes: 41 additions & 0 deletions components/load_from_pdf/fondant_component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
name: Load from pdf
description: |
Load pdf data stored locally or remote using langchain loaders.
image: fndnt/load_from_pdf:dev
tags:
- Data loading

produces:
pdf_path:
type: string
file_name:
type: string
text:
type: string

args:
pdf_path:
description: |
The path to the a pdf file or a folder containing pdf files to load.
Can be a local path or a remote path. If the path is remote, the loader class will be
determined by the scheme of the path.
type: str
n_rows_to_load:
description: |
Optional argument that defines the number of rows to load. Useful for testing pipeline runs
on a small scale
type: int
default: None
index_column:
description: |
Column to set index to in the load component, if not specified a default globally unique
index will be set
type: str
default: None
n_partitions:
description: |
Number of partitions of the dask dataframe. If not specified, the number of partitions will
be equal to the number of CPU cores. Set to high values if the data is large and the pipeline
is running out of memory.
type: int
default: None
1 change: 1 addition & 0 deletions components/load_from_pdf/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
PyMuPDF==1.23.8
127 changes: 127 additions & 0 deletions components/load_from_pdf/src/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import logging
import os
import typing as t

import dask.dataframe as dd
import fitz
import fsspec as fs
import pandas as pd
from fondant.component import DaskLoadComponent
from fondant.core.component_spec import OperationSpec

logger = logging.getLogger(__name__)


class PDFReader(DaskLoadComponent):
def __init__(
self,
spec: OperationSpec,
*,
pdf_path: str,
n_rows_to_load: t.Optional[int] = None,
index_column: t.Optional[str] = None,
n_partitions: t.Optional[int] = None,
) -> None:
"""
Args:
spec: the operation spec for the component
pdf_path: Path to the PDF file
n_rows_to_load: optional argument that defines the number of rows to load.
Useful for testing pipeline runs on a small scale.
index_column: Column to set index to in the load component, if not specified a default
globally unique index will be set.
n_partitions: Number of partitions of the dask dataframe. If not specified, the number
of partitions will be equal to the number of CPU cores. Set to high values if
the data is large and the pipeline is running out of memory.
"""
self.spec = spec
self.pdf_path = pdf_path
self.n_rows_to_load = n_rows_to_load
self.index_column = index_column
self.protocol = fs.utils.get_protocol(self.pdf_path)
self.fs, _ = fs.core.url_to_fs(self.pdf_path)
self.n_partitions = n_partitions if n_partitions is not None else os.cpu_count()

def set_df_index(self, dask_df: dd.DataFrame) -> dd.DataFrame:
if self.index_column is None:
logger.info(
"Index column not specified, setting a globally unique index",
)

def _set_unique_index(dataframe: pd.DataFrame, partition_info=None):
"""Function that sets a unique index based on the partition and row number."""
dataframe["id"] = 1
dataframe["id"] = (
str(partition_info["number"])
+ "_"
+ (dataframe.id.cumsum()).astype(str)
)
dataframe.index = dataframe.pop("id")
return dataframe

def _get_meta_df() -> pd.DataFrame:
meta_dict = {"id": pd.Series(dtype="object")}
for field_name, field in self.spec.inner_produces.items():
meta_dict[field_name] = pd.Series(
dtype=pd.ArrowDtype(field.type.value),
)
return pd.DataFrame(meta_dict).set_index("id")

meta = _get_meta_df()
dask_df = dask_df.map_partitions(_set_unique_index, meta=meta)
else:
logger.info(f"Setting `{self.index_column}` as index")
dask_df = dask_df.set_index(self.index_column, drop=True)

return dask_df

def load_pdf_from_fs(self, file_path: str):
with self.fs.open(file_path, "rb") as pdf_file:
pdf_bytes = pdf_file.read()

documents = fitz.open("pdf", pdf_bytes)
# get all text
text = "".join([document.get_text() for document in documents])
documents.close()

return text

def process_pdf(self, row):
file_path = row["pdf_path"]
text = self.load_pdf_from_fs(file_path)
row["file_name"] = file_path.split("/")[-1] # Extracting filename
row["text"] = text
return row

def load(self) -> dd.DataFrame:
try:
file_paths = self.fs.ls(self.pdf_path)
except NotADirectoryError:
file_paths = [self.pdf_path]

file_paths = [
file_path for file_path in file_paths if file_path.endswith(".pdf")
]

if self.n_rows_to_load is not None:
file_paths = file_paths[: self.n_rows_to_load]

dask_df = dd.from_pandas(
pd.DataFrame({"pdf_path": file_paths}),
npartitions=self.n_partitions,
)

meta_dict = {}
for field_name, field in self.spec.inner_produces.items():
meta_dict[field_name] = pd.Series(
dtype=pd.ArrowDtype(field.type.value),
)
meta_dict = pd.DataFrame(meta_dict)

dask_df = dask_df.map_partitions(
lambda part: part.apply(self.process_pdf, axis=1),
meta=meta_dict,
)

dask_df = self.set_df_index(dask_df)
return dask_df
47 changes: 47 additions & 0 deletions components/load_from_pdf/tests/component_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from pathlib import Path

import yaml
from fondant.core.component_spec import ComponentSpec, OperationSpec

from src.main import PDFReader


def test_pdf_reader():
"""Test the component with the ArxivReader.

This test requires a stable internet connection, both to download the loader, and to download
the papers from Arxiv.
"""
with open(Path(__file__).with_name("fondant_component.yaml")) as f:
print(f.name)
spec = ComponentSpec(yaml.safe_load(f))
spec = OperationSpec(spec)

pdf_path = ["tests/test_file/dummy.pdf", "tests/test_folder"]

for path in pdf_path:
component = PDFReader(
spec=spec,
pdf_path=path,
n_rows_to_load=None,
index_column=None,
)

output_dataframe = component.load().compute()

assert output_dataframe.columns.tolist() == ["pdf_path", "file_name", "text"]

if path == "tests/test_file/dummy.pdf":
assert output_dataframe.shape == (1, 3)
assert output_dataframe["file_name"].tolist() == ["dummy.pdf"]
assert output_dataframe["text"].tolist() == ["Dummy PDF file\n"]
else:
assert output_dataframe.shape == (2, 3)
assert output_dataframe["file_name"].tolist() == [
"dummy_2.pdf",
"dummy_1.pdf",
]
assert output_dataframe["text"].tolist() == [
"Dummy PDF file\n",
"Dummy PDF file\n",
]
34 changes: 34 additions & 0 deletions components/load_from_pdf/tests/fondant_component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
name: Load from pdf
description: |
Load pdf data stored locally or remote using langchain loaders.
image: fndnt/load_from_pdf:dev
tags:
- Data loading

produces:
pdf_path:
type: string
file_name:
type: string
text:
type: string

args:
pdf_path:
description: |
The path to the a pdf file or a folder containing pdf files to load.
Can be a local path or a remote path. If the path is remote, the loader class will be
determined by the scheme of the path.
type: str
n_rows_to_load:
description: |
Optional argument that defines the number of rows to load. Useful for testing pipeline runs
on a small scale
type: int
default: None
index_column:
description: |
Column to set index to in the load component, if not specified a default globally unique
index will be set
type: str
default: None
2 changes: 2 additions & 0 deletions components/load_from_pdf/tests/pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[pytest]
pythonpath = ../src
1 change: 1 addition & 0 deletions components/load_from_pdf/tests/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pytest==7.4.2
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading