From 71055d0143a01b593c3bfe5b3ff0bbe455bd47aa Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Thu, 9 Nov 2023 14:58:31 +0100 Subject: [PATCH] Add simple pipeline and integration test for the LocalRunner (#594) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Georges Lorré <35808396+GeorgesLorre@users.noreply.github.com> --- .../components/dummy_component/Dockerfile | 24 +++++ .../components/dummy_component/README.md | 56 ++++++++++++ .../dummy_component/fondant_component.yaml | 16 ++++ .../dummy_component/requirements.txt | 1 + .../components/dummy_component/src/main.py | 24 +++++ .../load_from_parquet/fondant_component.yaml | 25 ++++++ .../sample_pipeline_test/data/sample.parquet | Bin 0 -> 2165 bytes tests/test_sample_pipeline.py | 82 ++++++++++++++++++ 8 files changed, 228 insertions(+) create mode 100644 tests/sample_pipeline_test/components/dummy_component/Dockerfile create mode 100644 tests/sample_pipeline_test/components/dummy_component/README.md create mode 100644 tests/sample_pipeline_test/components/dummy_component/fondant_component.yaml create mode 100644 tests/sample_pipeline_test/components/dummy_component/requirements.txt create mode 100644 tests/sample_pipeline_test/components/dummy_component/src/main.py create mode 100644 tests/sample_pipeline_test/components/load_from_parquet/fondant_component.yaml create mode 100644 tests/sample_pipeline_test/data/sample.parquet create mode 100644 tests/test_sample_pipeline.py diff --git a/tests/sample_pipeline_test/components/dummy_component/Dockerfile b/tests/sample_pipeline_test/components/dummy_component/Dockerfile new file mode 100644 index 000000000..c39ada80e --- /dev/null +++ b/tests/sample_pipeline_test/components/dummy_component/Dockerfile @@ -0,0 +1,24 @@ +FROM --platform=linux/amd64 python:3.8-slim as base + +# System dependencies +RUN apt-get update && \ + apt-get upgrade -y && \ + apt-get install git -y + +# Install requirements +COPY requirements.txt / +RUN pip3 install --no-cache-dir -r requirements.txt + +# Install Fondant +# This is split from other requirements to leverage caching +ARG FONDANT_VERSION=main +RUN pip3 install fondant[component,aws,azure,gcp]@git+https://github.com/ml6team/fondant@${FONDANT_VERSION} + +# Set the working directory to the component folder +WORKDIR /component +COPY src/ src/ +ENV PYTHONPATH "${PYTHONPATH}:./src" + +FROM base +WORKDIR /component/src +ENTRYPOINT ["fondant", "execute", "main"] \ No newline at end of file diff --git a/tests/sample_pipeline_test/components/dummy_component/README.md b/tests/sample_pipeline_test/components/dummy_component/README.md new file mode 100644 index 000000000..97b3309e0 --- /dev/null +++ b/tests/sample_pipeline_test/components/dummy_component/README.md @@ -0,0 +1,56 @@ +# Chunk text + +### Description +Component that chunks text into smaller segments + +This component takes a body of text and chunks into small chunks. The id of the returned dataset +consists of the id of the original document followed by the chunk index. + + +### Inputs / outputs + +**This component consumes:** + +- text + - data: string + +**This component produces:** + +- text + - data: string + - original_document_id: string + +### Arguments + +The component takes the following arguments to alter its behavior: + +| argument | type | description | default | +| -------- | ---- | ----------- | ------- | +| chunk_size | int | Maximum size of chunks to return | / | +| chunk_overlap | int | Overlap in characters between chunks | / | + +### Usage + +You can add this component to your pipeline using the following code: + +```python +from fondant.pipeline import ComponentOp + + +chunk_text_op = ComponentOp.from_registry( + name="chunk_text", + arguments={ + # Add arguments + # "chunk_size": 0, + # "chunk_overlap": 0, + } +) +pipeline.add_op(chunk_text_op, dependencies=[...]) #Add previous component as dependency +``` + +### Testing + +You can run the tests using docker with BuildKit. From this directory, run: +``` +docker build . --target test +``` diff --git a/tests/sample_pipeline_test/components/dummy_component/fondant_component.yaml b/tests/sample_pipeline_test/components/dummy_component/fondant_component.yaml new file mode 100644 index 000000000..1091703eb --- /dev/null +++ b/tests/sample_pipeline_test/components/dummy_component/fondant_component.yaml @@ -0,0 +1,16 @@ +name: Dummy component +description: Dummy component for testing custom components + +image: fndnt/dummy_component:dev + +consumes: + text: + fields: + data: + type: string + +produces: + text: + fields: + data: + type: string \ No newline at end of file diff --git a/tests/sample_pipeline_test/components/dummy_component/requirements.txt b/tests/sample_pipeline_test/components/dummy_component/requirements.txt new file mode 100644 index 000000000..27140199e --- /dev/null +++ b/tests/sample_pipeline_test/components/dummy_component/requirements.txt @@ -0,0 +1 @@ +langchain==0.0.315 \ No newline at end of file diff --git a/tests/sample_pipeline_test/components/dummy_component/src/main.py b/tests/sample_pipeline_test/components/dummy_component/src/main.py new file mode 100644 index 000000000..bf0ddedcd --- /dev/null +++ b/tests/sample_pipeline_test/components/dummy_component/src/main.py @@ -0,0 +1,24 @@ +""" +Component that chunks text into smaller segments. + +This component takes a body of text and chunks into small chunks. The id of the returned dataset +consists of the id of the original document followed by the chunk index. + +""" +import logging + +import pandas as pd +from fondant.component import PandasTransformComponent + +logger = logging.getLogger(__name__) + + +class DummyComponent(PandasTransformComponent): + """Dummy component that returns the dataframe as it is.""" + + def __init__(self, *_): + pass + + def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: + """Dummy component that returns the dataframe as it is.""" + return dataframe diff --git a/tests/sample_pipeline_test/components/load_from_parquet/fondant_component.yaml b/tests/sample_pipeline_test/components/load_from_parquet/fondant_component.yaml new file mode 100644 index 000000000..35c43aadb --- /dev/null +++ b/tests/sample_pipeline_test/components/load_from_parquet/fondant_component.yaml @@ -0,0 +1,25 @@ +name: Load from parquet +description: Component that loads a dataset from a parquet uri +image: fndnt/load_from_parquet:dev + +produces: + text: + fields: + data: + type: string + +args: + dataset_uri: + description: The remote path to the parquet file/folder containing the dataset + type: str + column_name_mapping: + description: Mapping of the consumed dataset + type: dict + default: {} + n_rows_to_load: + description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale + type: int + index_column: + description: Column to set index to in the load component, if not specified a default globally unique index will be set + type: str + default: None \ No newline at end of file diff --git a/tests/sample_pipeline_test/data/sample.parquet b/tests/sample_pipeline_test/data/sample.parquet new file mode 100644 index 0000000000000000000000000000000000000000..3b56f832bf11d1858a6a14caeb60ee1ef54a9efa GIT binary patch literal 2165 zcmdT`PjBNy6!*4Kb}2m+gsTMQFtW7O9?}q}&C;yY!z64{XVZ3*Y~wTtRbzXaPHK;h z$IhQty&_JW`2yUKxF9|NNB9nW2F@H1Z=5u05ZcQkA*{sn=Djz+_j}Lp#d(c8x|&jd zNvS`jTIz?CX$FL3xCjgT^Feu|T6(M1CI8A&rnnTqJr*`aG-kr{ z9dHl=jMPRh8^$g{7PBaKVlR)zEFu)S1lR#mf|>wyse>3t5#u&HNHh|9i=0X6bN_~g+Ce-x3RUO>ZnYO11c zs~_i~P+MwRP3N8~&-AyQzc1JdNapaJas|roKU;(Q#-sK1uj|UrSC4b*X12Vgq_de+ zM5mFuw!N|a&ExGq(>uSVcAoq)PkgV~>&J(EfM~afoXVL-T4iT9j)^9b&9Xh?ReK+>F&|aX=Z?$XC zfux0m52=<%8i!c|1&1Dmorp*n!M2#legI-Q5vdgQyDwlf$Gb?t=>m2Y<&sg?xmeZB zXHDqkys849GQl$NAf@^pcM!eq0sU`Mk(C=IUM)AB0e2)YhnB|dG z>I9Z&CwRSHr_p*dWS4*AZqLBK9zp%?c|mNb;{w+sW={<2m1RB#@3{1+&64{3+q@fP z-D*rL{jRLn$!uWOi`JeYpAqnz2IuhnY<`{OV;{L7H^ Pipeline: + # Define pipeline + pipeline = Pipeline(pipeline_name="dummy-pipeline", base_path=data_dir) + + # Load from hub component + load_component_column_mapping = { + "text": "text_data", + } + + load_from_file = ComponentOp( + component_dir=Path(BASE_PATH / "components" / "load_from_parquet"), + arguments={ + "dataset_uri": "/data/sample.parquet", + "column_name_mapping": load_component_column_mapping, + "n_rows_to_load": 5, + }, + ) + + custom_dummy_component = ComponentOp( + component_dir=Path(BASE_PATH / "components" / "dummy_component"), + ) + + chunk_text = ComponentOp.from_registry( + name="chunk_text", + arguments={"chunk_size": 10, "chunk_overlap": 2}, + ) + + # Add components to the pipeline + pipeline.add_op(load_from_file) + pipeline.add_op(custom_dummy_component, dependencies=load_from_file) + pipeline.add_op(chunk_text, dependencies=[custom_dummy_component]) + + return pipeline + + +def test_local_runner(sample_pipeline, tmp_path_factory): + with tmp_path_factory.mktemp("temp") as data_dir: + sample_pipeline.base_path = str(data_dir) + DockerCompiler().compile( + sample_pipeline, + output_path="docker-compose.yaml", + extra_volumes=[ + str(Path("tests/sample_pipeline_test/data").resolve()) + ":/data", + ], + ) + DockerRunner().run("docker-compose.yaml") + + assert os.path.exists(data_dir / "dummy-pipeline") + assert os.path.exists(data_dir / "dummy-pipeline" / "cache") + pipeline_dirs = glob.glob( + str(data_dir / "dummy-pipeline" / "dummy-pipeline-*" / "*"), + ) + + assert len(pipeline_dirs) == NUMBER_OF_COMPONENTS + for dir in pipeline_dirs: + assert os.path.exists(Path(dir) / "index") + assert os.path.exists(Path(dir) / "text") + assert os.path.exists(Path(dir) / "manifest.json")