diff --git a/components/load_from_pdf/Dockerfile b/components/load_from_pdf/Dockerfile new file mode 100644 index 00000000..a0810612 --- /dev/null +++ b/components/load_from_pdf/Dockerfile @@ -0,0 +1,30 @@ +FROM --platform=linux/amd64 python:3.8-slim as base + +# System dependencies +RUN apt-get update && \ + apt-get upgrade -y && \ + apt-get install git -y + +# Install requirements +COPY requirements.txt / +RUN pip3 install --no-cache-dir -r requirements.txt + +# Install Fondant +# This is split from other requirements to leverage caching +ARG FONDANT_VERSION=main +RUN pip3 install fondant[component,aws,azure,gcp]@git+https://github.com/ml6team/fondant@${FONDANT_VERSION} + +# Set the working directory to the component folder +WORKDIR /component +COPY src/ src/ + +FROM base as test +COPY tests/ tests/ +RUN pip3 install --no-cache-dir -r tests/requirements.txt +RUN python -m pytest tests + +FROM base +COPY tests/ tests/ +WORKDIR /component/src +ENTRYPOINT ["fondant", "execute", "main"] + diff --git a/components/load_from_pdf/README.md b/components/load_from_pdf/README.md new file mode 100644 index 00000000..d257f3dc --- /dev/null +++ b/components/load_from_pdf/README.md @@ -0,0 +1,69 @@ +# Load from pdf + + +## Description +Load pdf data stored locally or remote using langchain loaders. + + + +## Inputs / outputs + + +### Consumes + + +**This component does not consume data.** + + + +### Produces +**This component produces:** + +- pdf_path: string +- file_name: string +- text: string + + + + +## Arguments + +The component takes the following arguments to alter its behavior: + +| argument | type | description | default | +| -------- | ---- | ----------- | ------- | +| pdf_path | str | The path to the a pdf file or a folder containing pdf files to load. Can be a local path or a remote path. If the path is remote, the loader class will be determined by the scheme of the path. | / | +| n_rows_to_load | int | Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale | / | +| index_column | str | Column to set index to in the load component, if not specified a default globally unique index will be set | / | +| n_partitions | int | Number of partitions of the dask dataframe. If not specified, the number of partitions will be equal to the number of CPU cores. Set to high values if the data is large and the pipelineis running out of memory. | / | + + +## Usage + +You can add this component to your pipeline using the following code: + +```python +from fondant.pipeline import Pipeline + + +pipeline = Pipeline(...) + +dataset = pipeline.read( + "load_from_pdf", + arguments={ + # Add arguments + # "pdf_path": , + # "n_rows_to_load": 0, + # "index_column": , + # "n_partitions": 0, + }, +) +``` + + +## Testing + +You can run the tests using docker with BuildKit. From this directory, run: +``` +docker build . --target test +``` diff --git a/components/load_from_pdf/fondant_component.yaml b/components/load_from_pdf/fondant_component.yaml new file mode 100644 index 00000000..d1ec6147 --- /dev/null +++ b/components/load_from_pdf/fondant_component.yaml @@ -0,0 +1,41 @@ +name: Load from pdf +description: | + Load pdf data stored locally or remote using langchain loaders. +image: fndnt/load_from_pdf:dev +tags: + - Data loading + +produces: + pdf_path: + type: string + file_name: + type: string + text: + type: string + +args: + pdf_path: + description: | + The path to the a pdf file or a folder containing pdf files to load. + Can be a local path or a remote path. If the path is remote, the loader class will be + determined by the scheme of the path. + type: str + n_rows_to_load: + description: | + Optional argument that defines the number of rows to load. Useful for testing pipeline runs + on a small scale + type: int + default: None + index_column: + description: | + Column to set index to in the load component, if not specified a default globally unique + index will be set + type: str + default: None + n_partitions: + description: | + Number of partitions of the dask dataframe. If not specified, the number of partitions will + be equal to the number of CPU cores. Set to high values if the data is large and the pipeline + is running out of memory. + type: int + default: None diff --git a/components/load_from_pdf/requirements.txt b/components/load_from_pdf/requirements.txt new file mode 100644 index 00000000..9b0233e4 --- /dev/null +++ b/components/load_from_pdf/requirements.txt @@ -0,0 +1 @@ +PyMuPDF==1.23.8 \ No newline at end of file diff --git a/components/load_from_pdf/src/main.py b/components/load_from_pdf/src/main.py new file mode 100644 index 00000000..f088f45f --- /dev/null +++ b/components/load_from_pdf/src/main.py @@ -0,0 +1,127 @@ +import logging +import os +import typing as t + +import dask.dataframe as dd +import fitz +import fsspec as fs +import pandas as pd +from fondant.component import DaskLoadComponent +from fondant.core.component_spec import OperationSpec + +logger = logging.getLogger(__name__) + + +class PDFReader(DaskLoadComponent): + def __init__( + self, + spec: OperationSpec, + *, + pdf_path: str, + n_rows_to_load: t.Optional[int] = None, + index_column: t.Optional[str] = None, + n_partitions: t.Optional[int] = None, + ) -> None: + """ + Args: + spec: the operation spec for the component + pdf_path: Path to the PDF file + n_rows_to_load: optional argument that defines the number of rows to load. + Useful for testing pipeline runs on a small scale. + index_column: Column to set index to in the load component, if not specified a default + globally unique index will be set. + n_partitions: Number of partitions of the dask dataframe. If not specified, the number + of partitions will be equal to the number of CPU cores. Set to high values if + the data is large and the pipeline is running out of memory. + """ + self.spec = spec + self.pdf_path = pdf_path + self.n_rows_to_load = n_rows_to_load + self.index_column = index_column + self.protocol = fs.utils.get_protocol(self.pdf_path) + self.fs, _ = fs.core.url_to_fs(self.pdf_path) + self.n_partitions = n_partitions if n_partitions is not None else os.cpu_count() + + def set_df_index(self, dask_df: dd.DataFrame) -> dd.DataFrame: + if self.index_column is None: + logger.info( + "Index column not specified, setting a globally unique index", + ) + + def _set_unique_index(dataframe: pd.DataFrame, partition_info=None): + """Function that sets a unique index based on the partition and row number.""" + dataframe["id"] = 1 + dataframe["id"] = ( + str(partition_info["number"]) + + "_" + + (dataframe.id.cumsum()).astype(str) + ) + dataframe.index = dataframe.pop("id") + return dataframe + + def _get_meta_df() -> pd.DataFrame: + meta_dict = {"id": pd.Series(dtype="object")} + for field_name, field in self.spec.inner_produces.items(): + meta_dict[field_name] = pd.Series( + dtype=pd.ArrowDtype(field.type.value), + ) + return pd.DataFrame(meta_dict).set_index("id") + + meta = _get_meta_df() + dask_df = dask_df.map_partitions(_set_unique_index, meta=meta) + else: + logger.info(f"Setting `{self.index_column}` as index") + dask_df = dask_df.set_index(self.index_column, drop=True) + + return dask_df + + def load_pdf_from_fs(self, file_path: str): + with self.fs.open(file_path, "rb") as pdf_file: + pdf_bytes = pdf_file.read() + + documents = fitz.open("pdf", pdf_bytes) + # get all text + text = "".join([document.get_text() for document in documents]) + documents.close() + + return text + + def process_pdf(self, row): + file_path = row["pdf_path"] + text = self.load_pdf_from_fs(file_path) + row["file_name"] = file_path.split("/")[-1] # Extracting filename + row["text"] = text + return row + + def load(self) -> dd.DataFrame: + try: + file_paths = self.fs.ls(self.pdf_path) + except NotADirectoryError: + file_paths = [self.pdf_path] + + file_paths = [ + file_path for file_path in file_paths if file_path.endswith(".pdf") + ] + + if self.n_rows_to_load is not None: + file_paths = file_paths[: self.n_rows_to_load] + + dask_df = dd.from_pandas( + pd.DataFrame({"pdf_path": file_paths}), + npartitions=self.n_partitions, + ) + + meta_dict = {} + for field_name, field in self.spec.inner_produces.items(): + meta_dict[field_name] = pd.Series( + dtype=pd.ArrowDtype(field.type.value), + ) + meta_dict = pd.DataFrame(meta_dict) + + dask_df = dask_df.map_partitions( + lambda part: part.apply(self.process_pdf, axis=1), + meta=meta_dict, + ) + + dask_df = self.set_df_index(dask_df) + return dask_df diff --git a/components/load_from_pdf/tests/component_test.py b/components/load_from_pdf/tests/component_test.py new file mode 100644 index 00000000..41c8eb66 --- /dev/null +++ b/components/load_from_pdf/tests/component_test.py @@ -0,0 +1,47 @@ +from pathlib import Path + +import yaml +from fondant.core.component_spec import ComponentSpec, OperationSpec + +from src.main import PDFReader + + +def test_pdf_reader(): + """Test the component with the ArxivReader. + + This test requires a stable internet connection, both to download the loader, and to download + the papers from Arxiv. + """ + with open(Path(__file__).with_name("fondant_component.yaml")) as f: + print(f.name) + spec = ComponentSpec(yaml.safe_load(f)) + spec = OperationSpec(spec) + + pdf_path = ["tests/test_file/dummy.pdf", "tests/test_folder"] + + for path in pdf_path: + component = PDFReader( + spec=spec, + pdf_path=path, + n_rows_to_load=None, + index_column=None, + ) + + output_dataframe = component.load().compute() + + assert output_dataframe.columns.tolist() == ["pdf_path", "file_name", "text"] + + if path == "tests/test_file/dummy.pdf": + assert output_dataframe.shape == (1, 3) + assert output_dataframe["file_name"].tolist() == ["dummy.pdf"] + assert output_dataframe["text"].tolist() == ["Dummy PDF file\n"] + else: + assert output_dataframe.shape == (2, 3) + assert output_dataframe["file_name"].tolist() == [ + "dummy_2.pdf", + "dummy_1.pdf", + ] + assert output_dataframe["text"].tolist() == [ + "Dummy PDF file\n", + "Dummy PDF file\n", + ] diff --git a/components/load_from_pdf/tests/fondant_component.yaml b/components/load_from_pdf/tests/fondant_component.yaml new file mode 100644 index 00000000..b255587e --- /dev/null +++ b/components/load_from_pdf/tests/fondant_component.yaml @@ -0,0 +1,34 @@ +name: Load from pdf +description: | + Load pdf data stored locally or remote using langchain loaders. +image: fndnt/load_from_pdf:dev +tags: + - Data loading + +produces: + pdf_path: + type: string + file_name: + type: string + text: + type: string + +args: + pdf_path: + description: | + The path to the a pdf file or a folder containing pdf files to load. + Can be a local path or a remote path. If the path is remote, the loader class will be + determined by the scheme of the path. + type: str + n_rows_to_load: + description: | + Optional argument that defines the number of rows to load. Useful for testing pipeline runs + on a small scale + type: int + default: None + index_column: + description: | + Column to set index to in the load component, if not specified a default globally unique + index will be set + type: str + default: None diff --git a/components/load_from_pdf/tests/pytest.ini b/components/load_from_pdf/tests/pytest.ini new file mode 100644 index 00000000..bf6a8a51 --- /dev/null +++ b/components/load_from_pdf/tests/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +pythonpath = ../src \ No newline at end of file diff --git a/components/load_from_pdf/tests/requirements.txt b/components/load_from_pdf/tests/requirements.txt new file mode 100644 index 00000000..2a929edc --- /dev/null +++ b/components/load_from_pdf/tests/requirements.txt @@ -0,0 +1 @@ +pytest==7.4.2 diff --git a/components/load_from_pdf/tests/test_file/dummy.pdf b/components/load_from_pdf/tests/test_file/dummy.pdf new file mode 100644 index 00000000..774c2ea7 Binary files /dev/null and b/components/load_from_pdf/tests/test_file/dummy.pdf differ diff --git a/components/load_from_pdf/tests/test_folder/dummy_1.pdf b/components/load_from_pdf/tests/test_folder/dummy_1.pdf new file mode 100644 index 00000000..774c2ea7 Binary files /dev/null and b/components/load_from_pdf/tests/test_folder/dummy_1.pdf differ diff --git a/components/load_from_pdf/tests/test_folder/dummy_2.pdf b/components/load_from_pdf/tests/test_folder/dummy_2.pdf new file mode 100644 index 00000000..774c2ea7 Binary files /dev/null and b/components/load_from_pdf/tests/test_folder/dummy_2.pdf differ