From 2738aadf8fd577b26269eaa140f6b39facd061eb Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Mon, 15 Jan 2024 11:10:49 +0100 Subject: [PATCH] Fix load from pdf component (#778) PR that fixes the load from PDF component. Previously the spec was passed as an argument to the component but with the new changes we only need to pass in the `produces` section of the spec --- components/load_from_pdf/src/main.py | 14 ++++++++------ components/load_from_pdf/tests/component_test.py | 10 ++++++---- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/components/load_from_pdf/src/main.py b/components/load_from_pdf/src/main.py index f088f45f..88a4467e 100644 --- a/components/load_from_pdf/src/main.py +++ b/components/load_from_pdf/src/main.py @@ -7,7 +7,7 @@ import fsspec as fs import pandas as pd from fondant.component import DaskLoadComponent -from fondant.core.component_spec import OperationSpec +from fondant.core.schema import Field logger = logging.getLogger(__name__) @@ -15,16 +15,17 @@ class PDFReader(DaskLoadComponent): def __init__( self, - spec: OperationSpec, + produces: t.Dict[str, Field], *, pdf_path: str, n_rows_to_load: t.Optional[int] = None, index_column: t.Optional[str] = None, n_partitions: t.Optional[int] = None, + **kwargs, ) -> None: """ Args: - spec: the operation spec for the component + produces: The schema the component should produce pdf_path: Path to the PDF file n_rows_to_load: optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale. @@ -33,8 +34,9 @@ def __init__( n_partitions: Number of partitions of the dask dataframe. If not specified, the number of partitions will be equal to the number of CPU cores. Set to high values if the data is large and the pipeline is running out of memory. + kwargs: Unhandled keyword arguments passed in by Fondant. """ - self.spec = spec + self.produces = produces self.pdf_path = pdf_path self.n_rows_to_load = n_rows_to_load self.index_column = index_column @@ -61,7 +63,7 @@ def _set_unique_index(dataframe: pd.DataFrame, partition_info=None): def _get_meta_df() -> pd.DataFrame: meta_dict = {"id": pd.Series(dtype="object")} - for field_name, field in self.spec.inner_produces.items(): + for field_name, field in self.produces.items(): meta_dict[field_name] = pd.Series( dtype=pd.ArrowDtype(field.type.value), ) @@ -112,7 +114,7 @@ def load(self) -> dd.DataFrame: ) meta_dict = {} - for field_name, field in self.spec.inner_produces.items(): + for field_name, field in self.produces.items(): meta_dict[field_name] = pd.Series( dtype=pd.ArrowDtype(field.type.value), ) diff --git a/components/load_from_pdf/tests/component_test.py b/components/load_from_pdf/tests/component_test.py index 41c8eb66..a0d7a6b6 100644 --- a/components/load_from_pdf/tests/component_test.py +++ b/components/load_from_pdf/tests/component_test.py @@ -13,15 +13,15 @@ def test_pdf_reader(): the papers from Arxiv. """ with open(Path(__file__).with_name("fondant_component.yaml")) as f: - print(f.name) spec = ComponentSpec(yaml.safe_load(f)) + spec = OperationSpec(spec) pdf_path = ["tests/test_file/dummy.pdf", "tests/test_folder"] for path in pdf_path: component = PDFReader( - spec=spec, + produces=dict(spec.inner_produces), pdf_path=path, n_rows_to_load=None, index_column=None, @@ -37,9 +37,11 @@ def test_pdf_reader(): assert output_dataframe["text"].tolist() == ["Dummy PDF file\n"] else: assert output_dataframe.shape == (2, 3) - assert output_dataframe["file_name"].tolist() == [ - "dummy_2.pdf", + file_names = output_dataframe["file_name"].tolist() + file_names.sort() + assert file_names == [ "dummy_1.pdf", + "dummy_2.pdf", ] assert output_dataframe["text"].tolist() == [ "Dummy PDF file\n",