diff --git a/components/caption_images/README.md b/components/caption_images/README.md index 2158b173b..defcbc0de 100644 --- a/components/caption_images/README.md +++ b/components/caption_images/README.md @@ -31,9 +31,9 @@ The component takes the following arguments to alter its behavior: | argument | type | description | default | | -------- | ---- | ----------- | ------- | -| model_id | str | Id of the BLIP model on the Hugging Face hub | Salesforce/blip-image-captioning-base | -| batch_size | int | Batch size to use for inference | 8 | -| max_new_tokens | int | Maximum token length of each caption | 50 | +| model_id | | Id of the BLIP model on the Hugging Face hub | Salesforce/blip-image-captioning-base | +| batch_size | | Batch size to use for inference | 8 | +| max_new_tokens | | Maximum token length of each caption | 50 | ## Usage diff --git a/components/chunk_text/README.md b/components/chunk_text/README.md index 26aab59fa..490f9e4e2 100644 --- a/components/chunk_text/README.md +++ b/components/chunk_text/README.md @@ -43,7 +43,7 @@ The component takes the following arguments to alter its behavior: | argument | type | description | default | | -------- | ---- | ----------- | ------- | -| chunk_strategy | int | The strategy to use for chunking the text. One of ['RecursiveCharacterTextSplitter', 'HTMLHeaderTextSplitter', 'CharacterTextSplitter', 'Language', 'MarkdownHeaderTextSplitter', 'MarkdownTextSplitter', 'SentenceTransformersTokenTextSplitter', 'LatexTextSplitter', 'SpacyTextSplitter', 'TokenTextSplitter', 'NLTKTextSplitter', 'PythonCodeTextSplitter', 'character', 'NLTK', 'SpaCy'] | RecursiveCharacterTextSplitter | +| chunk_strategy | str | The strategy to use for chunking the text. One of ['RecursiveCharacterTextSplitter', 'HTMLHeaderTextSplitter', 'CharacterTextSplitter', 'Language', 'MarkdownHeaderTextSplitter', 'MarkdownTextSplitter', 'SentenceTransformersTokenTextSplitter', 'LatexTextSplitter', 'SpacyTextSplitter', 'TokenTextSplitter', 'NLTKTextSplitter', 'PythonCodeTextSplitter', 'character', 'NLTK', 'SpaCy'] | RecursiveCharacterTextSplitter | | chunk_kwargs | dict | The arguments to pass to the chunking strategy | / | | language_text_splitter | str | The programming language to use for splitting text into sentences if "language" is selected as the splitter. Check https://python.langchain.com/docs/modules/data_connection/document_transformers/code_splitter for more information on supported languages. | / | diff --git a/components/embed_images/README.md b/components/embed_images/README.md index 8b59e56cf..6a8b5e6fc 100644 --- a/components/embed_images/README.md +++ b/components/embed_images/README.md @@ -31,8 +31,8 @@ The component takes the following arguments to alter its behavior: | argument | type | description | default | | -------- | ---- | ----------- | ------- | -| model_id | str | Model id of a CLIP model on the Hugging Face hub | openai/clip-vit-large-patch14 | -| batch_size | int | Batch size to use when embedding | 8 | +| model_id | | Model id of a CLIP model on the Hugging Face hub | openai/clip-vit-large-patch14 | +| batch_size | | Batch size to use when embedding | 8 | ## Usage diff --git a/components/evaluate_ragas/README.md b/components/evaluate_ragas/README.md index 2e25a1897..d666b72e8 100644 --- a/components/evaluate_ragas/README.md +++ b/components/evaluate_ragas/README.md @@ -34,9 +34,9 @@ The component takes the following arguments to alter its behavior: | argument | type | description | default | | -------- | ---- | ----------- | ------- | -| module | str | Module from which the LLM is imported. Defaults to langchain.llms | langchain.llms | -| llm_name | str | Name of the selected llm | / | -| llm_kwargs | dict | Arguments of the selected llm | / | +| llm_module_name | | Module from which the LLM is imported. Defaults to langchain.llms | langchain.chat_models | +| llm_class_name | | Name of the selected llm | ChatOpenAI | +| llm_kwargs | | Arguments of the selected llm | {'model_name': 'gpt-3.5-turbo'} | ## Usage @@ -55,9 +55,9 @@ dataset = dataset.apply( "evaluate_ragas", arguments={ # Add arguments - # "module": "langchain.llms", - # "llm_name": , - # "llm_kwargs": {}, + # "llm_module_name": "langchain.chat_models", + # "llm_class_name": "ChatOpenAI", + # "llm_kwargs": {'model_name': 'gpt-3.5-turbo'}, }, produces={ : , diff --git a/components/evaluate_ragas/fondant_component.yaml b/components/evaluate_ragas/fondant_component.yaml index cfa6fe22d..5dc3db0bb 100644 --- a/components/evaluate_ragas/fondant_component.yaml +++ b/components/evaluate_ragas/fondant_component.yaml @@ -19,13 +19,15 @@ produces: args: - module: + llm_module_name: description: Module from which the LLM is imported. Defaults to langchain.llms type: str - default: "langchain.llms" - llm_name: + default: "langchain.chat_models" + llm_class_name: description: Name of the selected llm type: str + default: "ChatOpenAI" llm_kwargs: description: Arguments of the selected llm type: dict + default: {"model_name":"gpt-3.5-turbo"} diff --git a/components/evaluate_ragas/src/main.py b/components/evaluate_ragas/src/main.py index 184b50b24..ba299b2fb 100644 --- a/components/evaluate_ragas/src/main.py +++ b/components/evaluate_ragas/src/main.py @@ -11,24 +11,26 @@ class RetrieverEval(PandasTransformComponent): def __init__( self, *, - module: str, - llm_name: str, + llm_module_name: str, + llm_class_name: str, llm_kwargs: dict, produces: t.Dict[str, t.Any], **kwargs, ) -> None: """ Args: + llm_module_name: Module from which the LLM is imported. Defaults to + langchain.chat_models + llm_class_name: Name of the selected llm. Defaults to ChatOpenAI module: Module from which the LLM is imported. Defaults to langchain.llms - llm_name: Name of the selected llm llm_kwargs: Arguments of the selected llm produces: RAGAS metrics to compute. kwargs: Unhandled keyword arguments passed in by Fondant. """ self.llm = self.extract_llm( - module=module, - model_name=llm_name, - model_kwargs=llm_kwargs, + llm_module_name=llm_module_name, + llm_class_name=llm_class_name, + llm_kwargs=llm_kwargs, ) self.gpt_wrapper = LangchainLLM(llm=self.llm) self.metric_functions = self.extract_metric_functions( @@ -38,13 +40,16 @@ def __init__( # import the metric functions selected @staticmethod - def import_from(module, name): - module = __import__(module, fromlist=[name]) - return getattr(module, name) + def import_from(module_name: str, element_name: str): + module = __import__(module_name, fromlist=[element_name]) + return getattr(module, element_name) - def extract_llm(self, module, model_name, model_kwargs): - module = self.import_from(module, model_name) - return module(**model_kwargs) + def extract_llm(self, llm_module_name: str, llm_class_name: str, llm_kwargs: dict): + module = self.import_from( + module_name=llm_module_name, + element_name=llm_class_name, + ) + return module(**llm_kwargs) def extract_metric_functions(self, metrics: list): functions = [] diff --git a/components/filter_language/README.md b/components/filter_language/README.md index 369efc8e8..fb922247e 100644 --- a/components/filter_language/README.md +++ b/components/filter_language/README.md @@ -29,7 +29,7 @@ The component takes the following arguments to alter its behavior: | argument | type | description | default | | -------- | ---- | ----------- | ------- | -| language | str | A valid language code or identifier (e.g., "en", "fr", "de"). | en | +| language | | A valid language code or identifier (e.g., "en", "fr", "de"). | en | ## Usage diff --git a/components/index_aws_opensearch/src/main.py b/components/index_aws_opensearch/src/main.py index b04820e82..f1d7cd995 100644 --- a/components/index_aws_opensearch/src/main.py +++ b/components/index_aws_opensearch/src/main.py @@ -39,6 +39,9 @@ def __init__( ) self.create_index(index_body) + def teardown(self) -> None: + self.client.close() + def create_index(self, index_body: Dict[str, Any]): """Creates an index if not existing in AWS OpenSearch. diff --git a/components/index_qdrant/src/main.py b/components/index_qdrant/src/main.py index e74019308..90fd202cb 100644 --- a/components/index_qdrant/src/main.py +++ b/components/index_qdrant/src/main.py @@ -47,6 +47,9 @@ def __init__( self.batch_size = batch_size self.parallelism = parallelism + def teardown(self) -> None: + self.client.close() + def write(self, dataframe: dd.DataFrame) -> None: """ Writes the data from the given Dask DataFrame to the Qdrant collection. diff --git a/components/index_weaviate/src/main.py b/components/index_weaviate/src/main.py index fb3a94f3a..9761b2351 100644 --- a/components/index_weaviate/src/main.py +++ b/components/index_weaviate/src/main.py @@ -53,6 +53,9 @@ def __init__( }, ) + def teardown(self) -> None: + del self.client + def write(self, dataframe: dd.DataFrame) -> None: with self.client.batch as batch: for part in tqdm( diff --git a/components/load_from_pdf/Dockerfile b/components/load_from_pdf/Dockerfile new file mode 100644 index 000000000..a0810612f --- /dev/null +++ b/components/load_from_pdf/Dockerfile @@ -0,0 +1,30 @@ +FROM --platform=linux/amd64 python:3.8-slim as base + +# System dependencies +RUN apt-get update && \ + apt-get upgrade -y && \ + apt-get install git -y + +# Install requirements +COPY requirements.txt / +RUN pip3 install --no-cache-dir -r requirements.txt + +# Install Fondant +# This is split from other requirements to leverage caching +ARG FONDANT_VERSION=main +RUN pip3 install fondant[component,aws,azure,gcp]@git+https://github.com/ml6team/fondant@${FONDANT_VERSION} + +# Set the working directory to the component folder +WORKDIR /component +COPY src/ src/ + +FROM base as test +COPY tests/ tests/ +RUN pip3 install --no-cache-dir -r tests/requirements.txt +RUN python -m pytest tests + +FROM base +COPY tests/ tests/ +WORKDIR /component/src +ENTRYPOINT ["fondant", "execute", "main"] + diff --git a/components/load_from_pdf/README.md b/components/load_from_pdf/README.md new file mode 100644 index 000000000..d257f3dc9 --- /dev/null +++ b/components/load_from_pdf/README.md @@ -0,0 +1,69 @@ +# Load from pdf + + +## Description +Load pdf data stored locally or remote using langchain loaders. + + + +## Inputs / outputs + + +### Consumes + + +**This component does not consume data.** + + + +### Produces +**This component produces:** + +- pdf_path: string +- file_name: string +- text: string + + + + +## Arguments + +The component takes the following arguments to alter its behavior: + +| argument | type | description | default | +| -------- | ---- | ----------- | ------- | +| pdf_path | str | The path to the a pdf file or a folder containing pdf files to load. Can be a local path or a remote path. If the path is remote, the loader class will be determined by the scheme of the path. | / | +| n_rows_to_load | int | Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale | / | +| index_column | str | Column to set index to in the load component, if not specified a default globally unique index will be set | / | +| n_partitions | int | Number of partitions of the dask dataframe. If not specified, the number of partitions will be equal to the number of CPU cores. Set to high values if the data is large and the pipelineis running out of memory. | / | + + +## Usage + +You can add this component to your pipeline using the following code: + +```python +from fondant.pipeline import Pipeline + + +pipeline = Pipeline(...) + +dataset = pipeline.read( + "load_from_pdf", + arguments={ + # Add arguments + # "pdf_path": , + # "n_rows_to_load": 0, + # "index_column": , + # "n_partitions": 0, + }, +) +``` + + +## Testing + +You can run the tests using docker with BuildKit. From this directory, run: +``` +docker build . --target test +``` diff --git a/components/load_from_pdf/fondant_component.yaml b/components/load_from_pdf/fondant_component.yaml new file mode 100644 index 000000000..d1ec61476 --- /dev/null +++ b/components/load_from_pdf/fondant_component.yaml @@ -0,0 +1,41 @@ +name: Load from pdf +description: | + Load pdf data stored locally or remote using langchain loaders. +image: fndnt/load_from_pdf:dev +tags: + - Data loading + +produces: + pdf_path: + type: string + file_name: + type: string + text: + type: string + +args: + pdf_path: + description: | + The path to the a pdf file or a folder containing pdf files to load. + Can be a local path or a remote path. If the path is remote, the loader class will be + determined by the scheme of the path. + type: str + n_rows_to_load: + description: | + Optional argument that defines the number of rows to load. Useful for testing pipeline runs + on a small scale + type: int + default: None + index_column: + description: | + Column to set index to in the load component, if not specified a default globally unique + index will be set + type: str + default: None + n_partitions: + description: | + Number of partitions of the dask dataframe. If not specified, the number of partitions will + be equal to the number of CPU cores. Set to high values if the data is large and the pipeline + is running out of memory. + type: int + default: None diff --git a/components/load_from_pdf/requirements.txt b/components/load_from_pdf/requirements.txt new file mode 100644 index 000000000..9b0233e4f --- /dev/null +++ b/components/load_from_pdf/requirements.txt @@ -0,0 +1 @@ +PyMuPDF==1.23.8 \ No newline at end of file diff --git a/components/load_from_pdf/src/main.py b/components/load_from_pdf/src/main.py new file mode 100644 index 000000000..f088f45ff --- /dev/null +++ b/components/load_from_pdf/src/main.py @@ -0,0 +1,127 @@ +import logging +import os +import typing as t + +import dask.dataframe as dd +import fitz +import fsspec as fs +import pandas as pd +from fondant.component import DaskLoadComponent +from fondant.core.component_spec import OperationSpec + +logger = logging.getLogger(__name__) + + +class PDFReader(DaskLoadComponent): + def __init__( + self, + spec: OperationSpec, + *, + pdf_path: str, + n_rows_to_load: t.Optional[int] = None, + index_column: t.Optional[str] = None, + n_partitions: t.Optional[int] = None, + ) -> None: + """ + Args: + spec: the operation spec for the component + pdf_path: Path to the PDF file + n_rows_to_load: optional argument that defines the number of rows to load. + Useful for testing pipeline runs on a small scale. + index_column: Column to set index to in the load component, if not specified a default + globally unique index will be set. + n_partitions: Number of partitions of the dask dataframe. If not specified, the number + of partitions will be equal to the number of CPU cores. Set to high values if + the data is large and the pipeline is running out of memory. + """ + self.spec = spec + self.pdf_path = pdf_path + self.n_rows_to_load = n_rows_to_load + self.index_column = index_column + self.protocol = fs.utils.get_protocol(self.pdf_path) + self.fs, _ = fs.core.url_to_fs(self.pdf_path) + self.n_partitions = n_partitions if n_partitions is not None else os.cpu_count() + + def set_df_index(self, dask_df: dd.DataFrame) -> dd.DataFrame: + if self.index_column is None: + logger.info( + "Index column not specified, setting a globally unique index", + ) + + def _set_unique_index(dataframe: pd.DataFrame, partition_info=None): + """Function that sets a unique index based on the partition and row number.""" + dataframe["id"] = 1 + dataframe["id"] = ( + str(partition_info["number"]) + + "_" + + (dataframe.id.cumsum()).astype(str) + ) + dataframe.index = dataframe.pop("id") + return dataframe + + def _get_meta_df() -> pd.DataFrame: + meta_dict = {"id": pd.Series(dtype="object")} + for field_name, field in self.spec.inner_produces.items(): + meta_dict[field_name] = pd.Series( + dtype=pd.ArrowDtype(field.type.value), + ) + return pd.DataFrame(meta_dict).set_index("id") + + meta = _get_meta_df() + dask_df = dask_df.map_partitions(_set_unique_index, meta=meta) + else: + logger.info(f"Setting `{self.index_column}` as index") + dask_df = dask_df.set_index(self.index_column, drop=True) + + return dask_df + + def load_pdf_from_fs(self, file_path: str): + with self.fs.open(file_path, "rb") as pdf_file: + pdf_bytes = pdf_file.read() + + documents = fitz.open("pdf", pdf_bytes) + # get all text + text = "".join([document.get_text() for document in documents]) + documents.close() + + return text + + def process_pdf(self, row): + file_path = row["pdf_path"] + text = self.load_pdf_from_fs(file_path) + row["file_name"] = file_path.split("/")[-1] # Extracting filename + row["text"] = text + return row + + def load(self) -> dd.DataFrame: + try: + file_paths = self.fs.ls(self.pdf_path) + except NotADirectoryError: + file_paths = [self.pdf_path] + + file_paths = [ + file_path for file_path in file_paths if file_path.endswith(".pdf") + ] + + if self.n_rows_to_load is not None: + file_paths = file_paths[: self.n_rows_to_load] + + dask_df = dd.from_pandas( + pd.DataFrame({"pdf_path": file_paths}), + npartitions=self.n_partitions, + ) + + meta_dict = {} + for field_name, field in self.spec.inner_produces.items(): + meta_dict[field_name] = pd.Series( + dtype=pd.ArrowDtype(field.type.value), + ) + meta_dict = pd.DataFrame(meta_dict) + + dask_df = dask_df.map_partitions( + lambda part: part.apply(self.process_pdf, axis=1), + meta=meta_dict, + ) + + dask_df = self.set_df_index(dask_df) + return dask_df diff --git a/components/load_from_pdf/tests/component_test.py b/components/load_from_pdf/tests/component_test.py new file mode 100644 index 000000000..41c8eb66c --- /dev/null +++ b/components/load_from_pdf/tests/component_test.py @@ -0,0 +1,47 @@ +from pathlib import Path + +import yaml +from fondant.core.component_spec import ComponentSpec, OperationSpec + +from src.main import PDFReader + + +def test_pdf_reader(): + """Test the component with the ArxivReader. + + This test requires a stable internet connection, both to download the loader, and to download + the papers from Arxiv. + """ + with open(Path(__file__).with_name("fondant_component.yaml")) as f: + print(f.name) + spec = ComponentSpec(yaml.safe_load(f)) + spec = OperationSpec(spec) + + pdf_path = ["tests/test_file/dummy.pdf", "tests/test_folder"] + + for path in pdf_path: + component = PDFReader( + spec=spec, + pdf_path=path, + n_rows_to_load=None, + index_column=None, + ) + + output_dataframe = component.load().compute() + + assert output_dataframe.columns.tolist() == ["pdf_path", "file_name", "text"] + + if path == "tests/test_file/dummy.pdf": + assert output_dataframe.shape == (1, 3) + assert output_dataframe["file_name"].tolist() == ["dummy.pdf"] + assert output_dataframe["text"].tolist() == ["Dummy PDF file\n"] + else: + assert output_dataframe.shape == (2, 3) + assert output_dataframe["file_name"].tolist() == [ + "dummy_2.pdf", + "dummy_1.pdf", + ] + assert output_dataframe["text"].tolist() == [ + "Dummy PDF file\n", + "Dummy PDF file\n", + ] diff --git a/components/load_from_pdf/tests/fondant_component.yaml b/components/load_from_pdf/tests/fondant_component.yaml new file mode 100644 index 000000000..b255587e9 --- /dev/null +++ b/components/load_from_pdf/tests/fondant_component.yaml @@ -0,0 +1,34 @@ +name: Load from pdf +description: | + Load pdf data stored locally or remote using langchain loaders. +image: fndnt/load_from_pdf:dev +tags: + - Data loading + +produces: + pdf_path: + type: string + file_name: + type: string + text: + type: string + +args: + pdf_path: + description: | + The path to the a pdf file or a folder containing pdf files to load. + Can be a local path or a remote path. If the path is remote, the loader class will be + determined by the scheme of the path. + type: str + n_rows_to_load: + description: | + Optional argument that defines the number of rows to load. Useful for testing pipeline runs + on a small scale + type: int + default: None + index_column: + description: | + Column to set index to in the load component, if not specified a default globally unique + index will be set + type: str + default: None diff --git a/components/load_from_pdf/tests/pytest.ini b/components/load_from_pdf/tests/pytest.ini new file mode 100644 index 000000000..bf6a8a517 --- /dev/null +++ b/components/load_from_pdf/tests/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +pythonpath = ../src \ No newline at end of file diff --git a/components/load_from_pdf/tests/requirements.txt b/components/load_from_pdf/tests/requirements.txt new file mode 100644 index 000000000..2a929edcc --- /dev/null +++ b/components/load_from_pdf/tests/requirements.txt @@ -0,0 +1 @@ +pytest==7.4.2 diff --git a/components/load_from_pdf/tests/test_file/dummy.pdf b/components/load_from_pdf/tests/test_file/dummy.pdf new file mode 100644 index 000000000..774c2ea70 Binary files /dev/null and b/components/load_from_pdf/tests/test_file/dummy.pdf differ diff --git a/components/load_from_pdf/tests/test_folder/dummy_1.pdf b/components/load_from_pdf/tests/test_folder/dummy_1.pdf new file mode 100644 index 000000000..774c2ea70 Binary files /dev/null and b/components/load_from_pdf/tests/test_folder/dummy_1.pdf differ diff --git a/components/load_from_pdf/tests/test_folder/dummy_2.pdf b/components/load_from_pdf/tests/test_folder/dummy_2.pdf new file mode 100644 index 000000000..774c2ea70 Binary files /dev/null and b/components/load_from_pdf/tests/test_folder/dummy_2.pdf differ diff --git a/components/retrieve_from_weaviate/src/main.py b/components/retrieve_from_weaviate/src/main.py index 7bd3fe9dc..a1de66b11 100644 --- a/components/retrieve_from_weaviate/src/main.py +++ b/components/retrieve_from_weaviate/src/main.py @@ -24,6 +24,9 @@ def __init__( self.class_name = class_name self.k = top_k + def teardown(self) -> None: + del self.client + def retrieve_chunks(self, vector_query: list): """Get results from weaviate database.""" result = ( diff --git a/components/segment_images/README.md b/components/segment_images/README.md index 2ba49c19a..45d0f0945 100644 --- a/components/segment_images/README.md +++ b/components/segment_images/README.md @@ -31,8 +31,8 @@ The component takes the following arguments to alter its behavior: | argument | type | description | default | | -------- | ---- | ----------- | ------- | -| model_id | str | id of the model on the Hugging Face hub | openmmlab/upernet-convnext-small | -| batch_size | int | batch size to use | 8 | +| model_id | | id of the model on the Hugging Face hub | openmmlab/upernet-convnext-small | +| batch_size | | batch size to use | 8 | ## Usage diff --git a/docs/components/component_spec.md b/docs/components/component_spec.md index 9299305a2..65e121c18 100644 --- a/docs/components/component_spec.md +++ b/docs/components/component_spec.md @@ -301,4 +301,39 @@ class ExampleComponent(PandasTransformComponent): Returns: A pandas dataframe containing the transformed data """ +``` + +Afterwards, we pass all keyword arguments to the `__init__()` method of the component. + + +You can also use the a `teardown()` method to perform any cleanup after the component has been executed. +This is a good place to close any open connections or files. + +```python +import pandas as pd +from fondant.component import PandasTransformComponent +from my_library import Client + + def __init__(self, *, client_url, **kwargs) -> None: + """ + Args: + x_argument: An argument passed to the component + """ + # Initialize your component here based on the arguments + self.client = Client(client_url) + + def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: + """Implement your custom logic in this single method + + Args: + dataframe: A Pandas dataframe containing the data + + Returns: + A pandas dataframe containing the transformed data + """ + + def teardown(self): + """Perform any cleanup after the component has been executed + """ + self.client.shutdown() ``` \ No newline at end of file diff --git a/src/fondant/component/component.py b/src/fondant/component/component.py index 82d539b84..7d9a86113 100644 --- a/src/fondant/component/component.py +++ b/src/fondant/component/component.py @@ -26,6 +26,9 @@ def __init__( ): pass + def teardown(self) -> None: + """Method called after the component has been executed.""" + class DaskLoadComponent(BaseComponent): """Component that loads data and returns a Dask DataFrame.""" diff --git a/src/fondant/component/executor.py b/src/fondant/component/executor.py index 3026eb625..db3140703 100644 --- a/src/fondant/component/executor.py +++ b/src/fondant/component/executor.py @@ -335,7 +335,7 @@ def _run_execution( input_manifest: Manifest, ) -> Manifest: logging.info("Executing component") - component = component_cls( + component: Component = component_cls( consumes=self.operation_spec.inner_consumes, produces=self.operation_spec.inner_produces, **self.user_arguments, @@ -350,6 +350,8 @@ def _run_execution( ) self._write_data(dataframe=output_df, manifest=output_manifest) + component.teardown() + return output_manifest def execute(self, component_cls: t.Type[Component]) -> None: diff --git a/src/fondant/core/exceptions.py b/src/fondant/core/exceptions.py index 4143f389a..5560b9aab 100644 --- a/src/fondant/core/exceptions.py +++ b/src/fondant/core/exceptions.py @@ -25,3 +25,7 @@ class InvalidTypeSchema(ValidationError, FondantException): class UnsupportedTypeAnnotation(FondantException): """Thrown when an unsupported type annotation is encountered during type inference.""" + + +class PipelineRunError(ValidationError, FondantException): + """Thrown when a pipeline run results in an error.""" diff --git a/src/fondant/pipeline/runner.py b/src/fondant/pipeline/runner.py index 0ce3ea568..62b158c1c 100644 --- a/src/fondant/pipeline/runner.py +++ b/src/fondant/pipeline/runner.py @@ -6,6 +6,7 @@ import yaml +from fondant.core.exceptions import PipelineRunError from fondant.pipeline import Pipeline from fondant.pipeline.compiler import ( DockerCompiler, @@ -38,15 +39,23 @@ def _run(self, input_spec: str, *args, **kwargs): "--pull", "always", "--remove-orphans", + "--abort-on-container-exit", ] print("Starting pipeline run...") # copy the current environment with the DOCKER_DEFAULT_PLATFORM argument - subprocess.check_call( # nosec + output = subprocess.run( # nosec cmd, env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"), + capture_output=True, + encoding="utf8", ) + + if output.returncode != 0: + msg = f"Command failed with error: '{output.stderr}'" + raise PipelineRunError(msg) + print("Finished pipeline run.") def run( @@ -55,12 +64,11 @@ def run( *, extra_volumes: t.Union[t.Optional[list], t.Optional[str]] = None, build_args: t.Optional[t.List[str]] = None, - ) -> None: + ): """Run a pipeline, either from a compiled docker-compose spec or from a fondant pipeline. Args: input: the pipeline to compile or a path to a already compiled docker-compose spec - output_path: the path where to save the docker-compose spec extra_volumes: a list of extra volumes (using the Short syntax: https://docs.docker.com/compose/compose-file/05-services/#short-syntax-5) to mount in the docker-compose spec. @@ -258,8 +266,6 @@ def run( pipeline_name: the name of the pipeline to create role_arn: the Amazon Resource Name role to use for the processing steps, if none provided the `sagemaker.get_execution_role()` role will be used. - instance_type: the instance type to use for the processing steps - (see: https://aws.amazon.com/ec2/instance-types/ for options). """ if isinstance(input, Pipeline): os.makedirs(".fondant", exist_ok=True) diff --git a/tests/component/test_component.py b/tests/component/test_component.py index 397ab210e..191e6e329 100644 --- a/tests/component/test_component.py +++ b/tests/component/test_component.py @@ -298,6 +298,69 @@ def load(self): load.mock.assert_called_once() +@pytest.mark.usefixtures("_patched_data_writing") +def test_teardown_method(metadata): + # Mock CLI arguments load + operation_spec = OperationSpec( + ComponentSpec.from_file(components_path / "component.yaml"), + ) + + sys.argv = [ + "", + "--metadata", + metadata.to_json(), + "--flag", + "success", + "--value", + "1", + "--output_manifest_path", + str(components_path / "output_manifest.json"), + "--operation_spec", + operation_spec.to_json(), + "--cache", + "False", + "--produces", + "{}", + ] + + class MockClient: + def __init__(self): + self.is_connected = True + + def shutdown(self): + if self.is_connected: + self.is_connected = False + + client = MockClient() + + class MyLoadComponent(DaskLoadComponent): + def __init__(self, *, flag, value, **kwargs): + self.flag = flag + self.value = value + self.client = client + + def load(self): + data = { + "id": [0, 1], + "captions_data": ["hello world", "this is another caption"], + } + return dd.DataFrame.from_dict(data, npartitions=N_PARTITIONS) + + def teardown(self) -> None: + self.client.shutdown() + + executor_factory = ExecutorFactory(MyLoadComponent) + executor = executor_factory.get_executor() + assert executor.input_partition_rows is None + + teardown = patch_method_class(MyLoadComponent.teardown) + assert client.is_connected is True + with mock.patch.object(MyLoadComponent, "teardown", teardown): + executor.execute(MyLoadComponent) + teardown.mock.assert_called_once() + assert client.is_connected is False + + @pytest.mark.usefixtures("_patched_data_loading", "_patched_data_writing") def test_dask_transform_component(metadata): operation_spec = OperationSpec( diff --git a/tests/examples/__init__.py b/tests/examples/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/pipeline/test_runner.py b/tests/pipeline/test_runner.py index cdec89989..be7c736af 100644 --- a/tests/pipeline/test_runner.py +++ b/tests/pipeline/test_runner.py @@ -5,6 +5,7 @@ from unittest import mock import pytest +from fondant.core.exceptions import PipelineRunError from fondant.pipeline import Pipeline from fondant.pipeline.runner import ( DockerRunner, @@ -22,11 +23,23 @@ ) -def test_docker_runner(): +@pytest.fixture() +def mock_subprocess_run(): + def _mock_subprocess_run(*args, **kwargs): + class MockCompletedProcess: + returncode = 0 + + return MockCompletedProcess() + + return _mock_subprocess_run + + +def test_docker_runner(mock_subprocess_run): """Test that the docker runner while mocking subprocess.call.""" - with mock.patch("subprocess.check_call") as mock_call: + with mock.patch("subprocess.run") as mock_run: + mock_run.side_effect = mock_subprocess_run DockerRunner().run("some/path") - mock_call.assert_called_once_with( + mock_run.assert_called_once_with( [ "docker", "compose", @@ -37,15 +50,19 @@ def test_docker_runner(): "--pull", "always", "--remove-orphans", + "--abort-on-container-exit", ], env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"), + capture_output=True, + encoding="utf8", ) -def test_docker_runner_from_pipeline(): - with mock.patch("subprocess.check_call") as mock_call: +def test_docker_runner_from_pipeline(mock_subprocess_run): + with mock.patch("subprocess.run") as mock_run: + mock_run.side_effect = mock_subprocess_run DockerRunner().run(PIPELINE) - mock_call.assert_called_once_with( + mock_run.assert_called_once_with( [ "docker", "compose", @@ -56,11 +73,25 @@ def test_docker_runner_from_pipeline(): "--pull", "always", "--remove-orphans", + "--abort-on-container-exit", ], env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"), + capture_output=True, + encoding="utf8", ) +def test_invalid_docker_run(): + """Test that the docker runner throws the correct error.""" + spec_path = "some/path" + resolved_spec_path = str(Path(spec_path).resolve()) + with pytest.raises( + PipelineRunError, + match=f"stat {resolved_spec_path}: no such file or directory", + ): + DockerRunner().run(spec_path) + + class MockKfpClient: def __init__(self, host): self.host = host diff --git a/tests/test_cli.py b/tests/test_cli.py index fc73915cc..23a4e2d0c 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -44,6 +44,17 @@ def load(self): pass +@pytest.fixture() +def mock_subprocess_run(): + def _mock_subprocess_run(*args, **kwargs): + class MockCompletedProcess: + returncode = 0 + + return MockCompletedProcess() + + return _mock_subprocess_run + + @pytest.mark.parametrize("command", commands) def test_basic_invocation(command): """Test that the CLI (sub)commands can be invoked without errors.""" @@ -262,7 +273,7 @@ def test_sagemaker_compile(tmp_path_factory): ) -def test_local_run(): +def test_local_run(mock_subprocess_run): """Test that the run command works with different arguments.""" args = argparse.Namespace( local=True, @@ -275,9 +286,11 @@ def test_local_run(): extra_volumes=[], build_arg=[], ) - with patch("subprocess.check_call") as mock_call: + + with patch("subprocess.run") as mock_run: + mock_run.side_effect = mock_subprocess_run run_local(args) - mock_call.assert_called_once_with( + mock_run.assert_called_once_with( [ "docker", "compose", @@ -288,11 +301,15 @@ def test_local_run(): "--pull", "always", "--remove-orphans", + "--abort-on-container-exit", ], env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"), + capture_output=True, + encoding="utf8", ) - with patch("subprocess.check_call") as mock_call: + with patch("subprocess.run") as mock_run: + mock_run.side_effect = mock_subprocess_run args1 = argparse.Namespace( local=True, vertex=False, @@ -306,7 +323,7 @@ def test_local_run(): credentials=None, ) run_local(args1) - mock_call.assert_called_once_with( + mock_run.assert_called_once_with( [ "docker", "compose", @@ -317,12 +334,15 @@ def test_local_run(): "--pull", "always", "--remove-orphans", + "--abort-on-container-exit", ], env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"), + capture_output=True, + encoding="utf8", ) -def test_local_run_cloud_credentials(): +def test_local_run_cloud_credentials(mock_subprocess_run): namespace_creds_kwargs = [ {"auth_gcp": True, "auth_azure": False, "auth_aws": False}, {"auth_gcp": False, "auth_azure": True, "auth_aws": False}, @@ -333,8 +353,10 @@ def test_local_run_cloud_credentials(): with patch( "fondant.pipeline.compiler.DockerCompiler.compile", ) as mock_compiler, patch( - "subprocess.check_call", + "subprocess.run", ) as mock_runner: + mock_runner.side_effect = mock_subprocess_run + args = argparse.Namespace( local=True, vertex=False, @@ -360,7 +382,6 @@ def test_local_run_cloud_credentials(): output_path=".fondant/compose.yaml", build_args=[], ) - mock_runner.assert_called_once_with( [ "docker", @@ -372,8 +393,11 @@ def test_local_run_cloud_credentials(): "--pull", "always", "--remove-orphans", + "--abort-on-container-exit", ], env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"), + capture_output=True, + encoding="utf8", )