Skip to content

Commit

Permalink
Add simple pipeline and integration test for the LocalRunner (#594)
Browse files Browse the repository at this point in the history
Co-authored-by: Georges Lorré <[email protected]>
  • Loading branch information
mrchtr and GeorgesLorre authored Nov 9, 2023
1 parent 5d688d2 commit 71055d0
Show file tree
Hide file tree
Showing 8 changed files with 228 additions and 0 deletions.
24 changes: 24 additions & 0 deletions tests/sample_pipeline_test/components/dummy_component/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
FROM --platform=linux/amd64 python:3.8-slim as base

# System dependencies
RUN apt-get update && \
apt-get upgrade -y && \
apt-get install git -y

# Install requirements
COPY requirements.txt /
RUN pip3 install --no-cache-dir -r requirements.txt

# Install Fondant
# This is split from other requirements to leverage caching
ARG FONDANT_VERSION=main
RUN pip3 install fondant[component,aws,azure,gcp]@git+https://github.com/ml6team/fondant@${FONDANT_VERSION}

# Set the working directory to the component folder
WORKDIR /component
COPY src/ src/
ENV PYTHONPATH "${PYTHONPATH}:./src"

FROM base
WORKDIR /component/src
ENTRYPOINT ["fondant", "execute", "main"]
56 changes: 56 additions & 0 deletions tests/sample_pipeline_test/components/dummy_component/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Chunk text

### Description
Component that chunks text into smaller segments

This component takes a body of text and chunks into small chunks. The id of the returned dataset
consists of the id of the original document followed by the chunk index.


### Inputs / outputs

**This component consumes:**

- text
- data: string

**This component produces:**

- text
- data: string
- original_document_id: string

### Arguments

The component takes the following arguments to alter its behavior:

| argument | type | description | default |
| -------- | ---- | ----------- | ------- |
| chunk_size | int | Maximum size of chunks to return | / |
| chunk_overlap | int | Overlap in characters between chunks | / |

### Usage

You can add this component to your pipeline using the following code:

```python
from fondant.pipeline import ComponentOp


chunk_text_op = ComponentOp.from_registry(
name="chunk_text",
arguments={
# Add arguments
# "chunk_size": 0,
# "chunk_overlap": 0,
}
)
pipeline.add_op(chunk_text_op, dependencies=[...]) #Add previous component as dependency
```

### Testing

You can run the tests using docker with BuildKit. From this directory, run:
```
docker build . --target test
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
name: Dummy component
description: Dummy component for testing custom components

image: fndnt/dummy_component:dev

consumes:
text:
fields:
data:
type: string

produces:
text:
fields:
data:
type: string
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
langchain==0.0.315
24 changes: 24 additions & 0 deletions tests/sample_pipeline_test/components/dummy_component/src/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""
Component that chunks text into smaller segments.
This component takes a body of text and chunks into small chunks. The id of the returned dataset
consists of the id of the original document followed by the chunk index.
"""
import logging

import pandas as pd
from fondant.component import PandasTransformComponent

logger = logging.getLogger(__name__)


class DummyComponent(PandasTransformComponent):
"""Dummy component that returns the dataframe as it is."""

def __init__(self, *_):
pass

def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
"""Dummy component that returns the dataframe as it is."""
return dataframe
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
name: Load from parquet
description: Component that loads a dataset from a parquet uri
image: fndnt/load_from_parquet:dev

produces:
text:
fields:
data:
type: string

args:
dataset_uri:
description: The remote path to the parquet file/folder containing the dataset
type: str
column_name_mapping:
description: Mapping of the consumed dataset
type: dict
default: {}
n_rows_to_load:
description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale
type: int
index_column:
description: Column to set index to in the load component, if not specified a default globally unique index will be set
type: str
default: None
Binary file added tests/sample_pipeline_test/data/sample.parquet
Binary file not shown.
82 changes: 82 additions & 0 deletions tests/test_sample_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# This file contains a sample pipeline. Loading data from a parquet file,
# using the load_from_parquet component, chain a custom dummy component, and use
# the reusable chunking component
import glob
import logging
import os
from pathlib import Path

import pytest
from fondant.pipeline import ComponentOp, Pipeline
from fondant.pipeline.compiler import DockerCompiler
from fondant.pipeline.runner import DockerRunner

logger = logging.getLogger(__name__)

# TODO: probably removable after we have solved #344
# work around to make test executable on M1 Macbooks
os.environ["DOCKER_DEFAULT_PLATFORM"] = "linux/amd64"

BASE_PATH = Path("./tests/sample_pipeline_test")
NUMBER_OF_COMPONENTS = 3


@pytest.fixture()
def sample_pipeline(data_dir="./data") -> Pipeline:
# Define pipeline
pipeline = Pipeline(pipeline_name="dummy-pipeline", base_path=data_dir)

# Load from hub component
load_component_column_mapping = {
"text": "text_data",
}

load_from_file = ComponentOp(
component_dir=Path(BASE_PATH / "components" / "load_from_parquet"),
arguments={
"dataset_uri": "/data/sample.parquet",
"column_name_mapping": load_component_column_mapping,
"n_rows_to_load": 5,
},
)

custom_dummy_component = ComponentOp(
component_dir=Path(BASE_PATH / "components" / "dummy_component"),
)

chunk_text = ComponentOp.from_registry(
name="chunk_text",
arguments={"chunk_size": 10, "chunk_overlap": 2},
)

# Add components to the pipeline
pipeline.add_op(load_from_file)
pipeline.add_op(custom_dummy_component, dependencies=load_from_file)
pipeline.add_op(chunk_text, dependencies=[custom_dummy_component])

return pipeline


def test_local_runner(sample_pipeline, tmp_path_factory):
with tmp_path_factory.mktemp("temp") as data_dir:
sample_pipeline.base_path = str(data_dir)
DockerCompiler().compile(
sample_pipeline,
output_path="docker-compose.yaml",
extra_volumes=[
str(Path("tests/sample_pipeline_test/data").resolve()) + ":/data",
],
)
DockerRunner().run("docker-compose.yaml")

assert os.path.exists(data_dir / "dummy-pipeline")
assert os.path.exists(data_dir / "dummy-pipeline" / "cache")
pipeline_dirs = glob.glob(
str(data_dir / "dummy-pipeline" / "dummy-pipeline-*" / "*"),
)

assert len(pipeline_dirs) == NUMBER_OF_COMPONENTS
for dir in pipeline_dirs:
assert os.path.exists(Path(dir) / "index")
assert os.path.exists(Path(dir) / "text")
assert os.path.exists(Path(dir) / "manifest.json")

0 comments on commit 71055d0

Please sign in to comment.