From d011613591b7fde3758a57889bc545a9409fba5d Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Wed, 3 Jan 2024 11:32:47 +0100 Subject: [PATCH 01/22] Move sample pipeline to separate example folder and add execution in ci/cd pipeline --- .github/workflows/pipeline.yaml | 17 ++++ examples/sample_pipeline.py | 41 +++++++++ .../components/dummy_component/Dockerfile | 0 .../components/dummy_component/README.md | 0 .../dummy_component/fondant_component.yaml | 0 .../dummy_component/requirements.txt | 0 .../components/dummy_component/src/main.py | 0 .../sample_pipeline_test/data/sample.parquet | Bin scripts/run_sample_pipeline.sh | 21 +++++ src/fondant/pipeline/pipeline.py | 3 +- .../load_from_parquet/fondant_component.yaml | 23 ----- .../integration_tests/test_sample_pipeline.py | 79 ------------------ 12 files changed, 81 insertions(+), 103 deletions(-) create mode 100644 examples/sample_pipeline.py rename {tests/integration_tests => examples}/sample_pipeline_test/components/dummy_component/Dockerfile (100%) rename {tests/integration_tests => examples}/sample_pipeline_test/components/dummy_component/README.md (100%) rename {tests/integration_tests => examples}/sample_pipeline_test/components/dummy_component/fondant_component.yaml (100%) rename {tests/integration_tests => examples}/sample_pipeline_test/components/dummy_component/requirements.txt (100%) rename {tests/integration_tests => examples}/sample_pipeline_test/components/dummy_component/src/main.py (100%) rename {tests/integration_tests => examples}/sample_pipeline_test/data/sample.parquet (100%) create mode 100644 scripts/run_sample_pipeline.sh delete mode 100644 tests/integration_tests/sample_pipeline_test/components/load_from_parquet/fondant_component.yaml delete mode 100644 tests/integration_tests/test_sample_pipeline.py diff --git a/.github/workflows/pipeline.yaml b/.github/workflows/pipeline.yaml index 11deb2329..f14f4aeaf 100644 --- a/.github/workflows/pipeline.yaml +++ b/.github/workflows/pipeline.yaml @@ -32,6 +32,23 @@ jobs: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} COVERALLS_FLAG_NAME: test-${{ matrix.python-version }} + integration-test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - name: Set up Python 3.9 + uses: actions/setup-python@v1 + with: + python-version: 3.9 + - name: Install dependencies + run: | + pip install --upgrade pip + pip install poetry==1.4.0 + poetry install --all-extras --with test + + - name: Execute sample pipeline + run: ./scripts//run_sample_pipeline.sh + finish-coveralls: needs: test runs-on: ubuntu-latest diff --git a/examples/sample_pipeline.py b/examples/sample_pipeline.py new file mode 100644 index 000000000..17141b96d --- /dev/null +++ b/examples/sample_pipeline.py @@ -0,0 +1,41 @@ +# This file contains a sample pipeline. Loading data from a parquet file, +# using the load_from_parquet component, chain a custom dummy component, and use +# the reusable chunking component +import glob +import logging +import os +import pyarrow as pa +from pathlib import Path +from fondant.pipeline import Pipeline + +os.environ["DOCKER_DEFAULT_PLATFORM"] = "linux/amd64" +BASE_PATH = Path("./examples/sample_pipeline_test") +NUMBER_OF_COMPONENTS = 3 + +# Define pipeline +pipeline = Pipeline(name="dummy-pipeline", base_path=str(BASE_PATH / ".artifacts")) + +# Load from hub component +load_component_column_mapping = { + "text": "text_data", +} + +dataset = pipeline.read( + name_or_path="load_from_parquet", + arguments={ + "dataset_uri": "/data/sample.parquet", + "column_name_mapping": load_component_column_mapping, + "n_rows_to_load": 5, + }, + produces={"text_data": pa.string()}, +) + +dataset = dataset.apply( + name_or_path=Path(BASE_PATH / "components" / "dummy_component"), +) + +dataset.apply( + name_or_path="chunk_text", + arguments={"chunk_size": 10, "chunk_overlap": 2}, + consumes={"text": "text_data"}, +) diff --git a/tests/integration_tests/sample_pipeline_test/components/dummy_component/Dockerfile b/examples/sample_pipeline_test/components/dummy_component/Dockerfile similarity index 100% rename from tests/integration_tests/sample_pipeline_test/components/dummy_component/Dockerfile rename to examples/sample_pipeline_test/components/dummy_component/Dockerfile diff --git a/tests/integration_tests/sample_pipeline_test/components/dummy_component/README.md b/examples/sample_pipeline_test/components/dummy_component/README.md similarity index 100% rename from tests/integration_tests/sample_pipeline_test/components/dummy_component/README.md rename to examples/sample_pipeline_test/components/dummy_component/README.md diff --git a/tests/integration_tests/sample_pipeline_test/components/dummy_component/fondant_component.yaml b/examples/sample_pipeline_test/components/dummy_component/fondant_component.yaml similarity index 100% rename from tests/integration_tests/sample_pipeline_test/components/dummy_component/fondant_component.yaml rename to examples/sample_pipeline_test/components/dummy_component/fondant_component.yaml diff --git a/tests/integration_tests/sample_pipeline_test/components/dummy_component/requirements.txt b/examples/sample_pipeline_test/components/dummy_component/requirements.txt similarity index 100% rename from tests/integration_tests/sample_pipeline_test/components/dummy_component/requirements.txt rename to examples/sample_pipeline_test/components/dummy_component/requirements.txt diff --git a/tests/integration_tests/sample_pipeline_test/components/dummy_component/src/main.py b/examples/sample_pipeline_test/components/dummy_component/src/main.py similarity index 100% rename from tests/integration_tests/sample_pipeline_test/components/dummy_component/src/main.py rename to examples/sample_pipeline_test/components/dummy_component/src/main.py diff --git a/tests/integration_tests/sample_pipeline_test/data/sample.parquet b/examples/sample_pipeline_test/data/sample.parquet similarity index 100% rename from tests/integration_tests/sample_pipeline_test/data/sample.parquet rename to examples/sample_pipeline_test/data/sample.parquet diff --git a/scripts/run_sample_pipeline.sh b/scripts/run_sample_pipeline.sh new file mode 100644 index 000000000..7eb847c45 --- /dev/null +++ b/scripts/run_sample_pipeline.sh @@ -0,0 +1,21 @@ +#!/bin/bash +# This script executes the sample pipeline in the example folder, checks the correct execution and +# cleans up the directory again + +cleanup() { + # Create a temporary directory + artifact_directory="./examples/sample_pipeline_test/.artifacts" + # Check if the temporary directory exists before attempting to remove it + if [ -d "$artifact_directory" ]; then + rm -r "$artifact_directory" + echo "Temporary directory removed" + fi +} + +trap cleanup EXIT + +echo "Temporary directory created: $artifact_directory" +fondant run local examples/sample_pipeline.py + + + diff --git a/src/fondant/pipeline/pipeline.py b/src/fondant/pipeline/pipeline.py index 4a301b46d..210676d0b 100644 --- a/src/fondant/pipeline/pipeline.py +++ b/src/fondant/pipeline/pipeline.py @@ -491,7 +491,8 @@ def _validate_pipeline_definition(self, run_id: str): msg = ( f"Component '{component_op.name}' is trying to invoke the field " f"'{component_field_name}', which has not been defined or created " - f"in the previous components." + f"in the previous components. \n" + f"Available field names: {list(manifest.fields.keys())}" ) raise InvalidPipelineDefinition( msg, diff --git a/tests/integration_tests/sample_pipeline_test/components/load_from_parquet/fondant_component.yaml b/tests/integration_tests/sample_pipeline_test/components/load_from_parquet/fondant_component.yaml deleted file mode 100644 index eddb6e580..000000000 --- a/tests/integration_tests/sample_pipeline_test/components/load_from_parquet/fondant_component.yaml +++ /dev/null @@ -1,23 +0,0 @@ -name: Load from parquet -description: Component that loads a dataset from a parquet uri -image: fndnt/load_from_parquet:dev - -produces: - text_data: - type: string - -args: - dataset_uri: - description: The remote path to the parquet file/folder containing the dataset - type: str - column_name_mapping: - description: Mapping of the consumed dataset - type: dict - default: {} - n_rows_to_load: - description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale - type: int - index_column: - description: Column to set index to in the load component, if not specified a default globally unique index will be set - type: str - default: None \ No newline at end of file diff --git a/tests/integration_tests/test_sample_pipeline.py b/tests/integration_tests/test_sample_pipeline.py deleted file mode 100644 index 39b6b732f..000000000 --- a/tests/integration_tests/test_sample_pipeline.py +++ /dev/null @@ -1,79 +0,0 @@ -# This file contains a sample pipeline. Loading data from a parquet file, -# using the load_from_parquet component, chain a custom dummy component, and use -# the reusable chunking component -import glob -import logging -import os -from pathlib import Path - -import pytest -from fondant.pipeline import Pipeline -from fondant.pipeline.compiler import DockerCompiler -from fondant.pipeline.runner import DockerRunner - -logger = logging.getLogger(__name__) - -# TODO: probably removable after we have solved #344 -# work around to make test executable on M1 Macbooks -os.environ["DOCKER_DEFAULT_PLATFORM"] = "linux/amd64" - -BASE_PATH = Path("./tests/integration_tests/sample_pipeline_test") -NUMBER_OF_COMPONENTS = 3 - - -@pytest.fixture() -def sample_pipeline(data_dir="./data") -> Pipeline: - # Define pipeline - pipeline = Pipeline(name="dummy-pipeline", base_path=data_dir) - - # Load from hub component - load_component_column_mapping = { - "text": "text_data", - } - - dataset = pipeline.read( - name_or_path=Path(BASE_PATH / "components" / "load_from_parquet"), - arguments={ - "dataset_uri": "/data/sample.parquet", - "column_name_mapping": load_component_column_mapping, - "n_rows_to_load": 5, - }, - ) - - dataset = dataset.apply( - name_or_path=Path(BASE_PATH / "components" / "dummy_component"), - ) - - dataset.apply( - name_or_path="chunk_text", - arguments={"chunk_size": 10, "chunk_overlap": 2}, - ) - - return pipeline - - -@pytest.mark.skip(reason="Skipping due to random failure.") -def test_local_runner(sample_pipeline, tmp_path_factory): - with tmp_path_factory.mktemp("temp") as data_dir: - sample_pipeline.base_path = str(data_dir) - DockerCompiler().compile( - sample_pipeline, - output_path="docker-compose.yaml", - extra_volumes=[ - str(Path("tests/integration_tests/sample_pipeline_test/data").resolve()) - + ":/data", - ], - ) - DockerRunner().run("docker-compose.yaml") - - assert os.path.exists(data_dir / "dummy-pipeline") - assert os.path.exists(data_dir / "dummy-pipeline" / "cache") - pipeline_dirs = glob.glob( - str(data_dir / "dummy-pipeline" / "dummy-pipeline-*" / "*"), - ) - - assert len(pipeline_dirs) == NUMBER_OF_COMPONENTS - for dir in pipeline_dirs: - assert os.path.exists(Path(dir) / "index") - assert os.path.exists(Path(dir) / "text") - assert os.path.exists(Path(dir) / "manifest.json") From c168a2a6a372a38aef59aa02073fb3aa450264a8 Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Wed, 3 Jan 2024 11:37:56 +0100 Subject: [PATCH 02/22] Remove empty lines --- scripts/run_sample_pipeline.sh | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/scripts/run_sample_pipeline.sh b/scripts/run_sample_pipeline.sh index 7eb847c45..3e4750d68 100644 --- a/scripts/run_sample_pipeline.sh +++ b/scripts/run_sample_pipeline.sh @@ -15,7 +15,4 @@ cleanup() { trap cleanup EXIT echo "Temporary directory created: $artifact_directory" -fondant run local examples/sample_pipeline.py - - - +fondant run local examples/sample_pipeline.py \ No newline at end of file From 443907a5632074b982f4652be4d2896dabf94385 Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Wed, 3 Jan 2024 11:41:18 +0100 Subject: [PATCH 03/22] Fix ci/cd --- .github/workflows/pipeline.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pipeline.yaml b/.github/workflows/pipeline.yaml index f14f4aeaf..77fcd1419 100644 --- a/.github/workflows/pipeline.yaml +++ b/.github/workflows/pipeline.yaml @@ -47,7 +47,7 @@ jobs: poetry install --all-extras --with test - name: Execute sample pipeline - run: ./scripts//run_sample_pipeline.sh + run: ./scripts/run_sample_pipeline.sh finish-coveralls: needs: test From 8d2ad72ffba3f87d968468fa7c6830d3e0b65d5d Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Wed, 3 Jan 2024 11:46:07 +0100 Subject: [PATCH 04/22] Fix ci/cd --- scripts/run_sample_pipeline.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/scripts/run_sample_pipeline.sh b/scripts/run_sample_pipeline.sh index 3e4750d68..a8d20640c 100644 --- a/scripts/run_sample_pipeline.sh +++ b/scripts/run_sample_pipeline.sh @@ -1,6 +1,7 @@ #!/bin/bash # This script executes the sample pipeline in the example folder, checks the correct execution and # cleans up the directory again +set -e cleanup() { # Create a temporary directory From 51cdb48f4ce00e167e24f0b24fdd86a53a0ddf31 Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Wed, 3 Jan 2024 11:59:03 +0100 Subject: [PATCH 05/22] Fix ci/cd --- scripts/run_sample_pipeline.sh | 1 - 1 file changed, 1 deletion(-) mode change 100644 => 100755 scripts/run_sample_pipeline.sh diff --git a/scripts/run_sample_pipeline.sh b/scripts/run_sample_pipeline.sh old mode 100644 new mode 100755 index a8d20640c..816097a18 --- a/scripts/run_sample_pipeline.sh +++ b/scripts/run_sample_pipeline.sh @@ -15,5 +15,4 @@ cleanup() { trap cleanup EXIT -echo "Temporary directory created: $artifact_directory" fondant run local examples/sample_pipeline.py \ No newline at end of file From 7cf0eacdffa2c21b078e3d57d49bd444a562b32a Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Wed, 3 Jan 2024 12:06:10 +0100 Subject: [PATCH 06/22] Fix ci/cd --- scripts/run_sample_pipeline.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/run_sample_pipeline.sh b/scripts/run_sample_pipeline.sh index 816097a18..e0b9fabc1 100755 --- a/scripts/run_sample_pipeline.sh +++ b/scripts/run_sample_pipeline.sh @@ -15,4 +15,4 @@ cleanup() { trap cleanup EXIT -fondant run local examples/sample_pipeline.py \ No newline at end of file +poetry run fondant run local examples/sample_pipeline.py \ No newline at end of file From 317826a64423900533e55a157d5e427abf59d01b Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Wed, 3 Jan 2024 15:27:59 +0100 Subject: [PATCH 07/22] Fix ci/cd --- .../{ => sample_pipeline_test}/sample_pipeline.py | 0 scripts/run_sample_pipeline.sh | 13 ++++++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) rename examples/{ => sample_pipeline_test}/sample_pipeline.py (100%) diff --git a/examples/sample_pipeline.py b/examples/sample_pipeline_test/sample_pipeline.py similarity index 100% rename from examples/sample_pipeline.py rename to examples/sample_pipeline_test/sample_pipeline.py diff --git a/scripts/run_sample_pipeline.sh b/scripts/run_sample_pipeline.sh index e0b9fabc1..56ac246b1 100755 --- a/scripts/run_sample_pipeline.sh +++ b/scripts/run_sample_pipeline.sh @@ -15,4 +15,15 @@ cleanup() { trap cleanup EXIT -poetry run fondant run local examples/sample_pipeline.py \ No newline at end of file +poetry run fondant run local examples/sample_pipeline_test/sample_pipeline.py + +# Expect that .artifacts was created and isn't empty +if [ -d "./examples/sample_pipeline_test/.artifacts" ]; then + if [ "$(ls -A .artifacts)" ]; then + echo "Sample pipeline executed successfully." + exit 0 + fi +fi + +echo "Sample pipeline execution failed." +exit 1 From 189bfabeddf1e5d63e98c887b79dd02a8de9a4b5 Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Wed, 3 Jan 2024 15:36:43 +0100 Subject: [PATCH 08/22] Fix ci/cd --- docs/runners/local.md | 2 +- scripts/run_sample_pipeline.sh | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/runners/local.md b/docs/runners/local.md index b412b825b..8e9584d77 100644 --- a/docs/runners/local.md +++ b/docs/runners/local.md @@ -46,7 +46,7 @@ about this in the [installation](../guides/installation.md) guide. fondant run local --auth-azure ``` - You can also use the `--extra_volumes` argument to mount extra credentials or additional files. + You can also use the `--extra-volumes` argument to mount extra credentials or additional files. This volumes will be mounted to every component/service of the docker-compose spec. diff --git a/scripts/run_sample_pipeline.sh b/scripts/run_sample_pipeline.sh index 56ac246b1..eaf346257 100755 --- a/scripts/run_sample_pipeline.sh +++ b/scripts/run_sample_pipeline.sh @@ -15,7 +15,8 @@ cleanup() { trap cleanup EXIT -poetry run fondant run local examples/sample_pipeline_test/sample_pipeline.py +poetry run fondant run local examples/sample_pipeline_test/sample_pipeline.py \ + --extra-volumes ./examples/sample_pipeline_test/data:/data # Expect that .artifacts was created and isn't empty if [ -d "./examples/sample_pipeline_test/.artifacts" ]; then From abb88ce96fcee208d7c109ef692d60d97b1436a0 Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Wed, 3 Jan 2024 15:44:43 +0100 Subject: [PATCH 09/22] Fix ci/cd --- .github/workflows/pipeline.yaml | 7 +++++-- scripts/run_sample_pipeline.sh | 4 +++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/.github/workflows/pipeline.yaml b/.github/workflows/pipeline.yaml index 77fcd1419..4cc756fc7 100644 --- a/.github/workflows/pipeline.yaml +++ b/.github/workflows/pipeline.yaml @@ -34,12 +34,15 @@ jobs: integration-test: runs-on: ubuntu-latest + strategy: + matrix: + python-version: [ '3.8', '3.9', '3.10' ] steps: - uses: actions/checkout@v2 - - name: Set up Python 3.9 + - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v1 with: - python-version: 3.9 + python-version: ${{ matrix.python-version }} - name: Install dependencies run: | pip install --upgrade pip diff --git a/scripts/run_sample_pipeline.sh b/scripts/run_sample_pipeline.sh index eaf346257..a83fd8072 100755 --- a/scripts/run_sample_pipeline.sh +++ b/scripts/run_sample_pipeline.sh @@ -15,8 +15,10 @@ cleanup() { trap cleanup EXIT +resolved_path=$(readlink -f "examples/sample_pipeline_test/data") + poetry run fondant run local examples/sample_pipeline_test/sample_pipeline.py \ - --extra-volumes ./examples/sample_pipeline_test/data:/data + --extra-volumes $resolved_path:/data # Expect that .artifacts was created and isn't empty if [ -d "./examples/sample_pipeline_test/.artifacts" ]; then From 99b481cf99cad865ad2dd7fa1a5f4d4fb696f88a Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Wed, 3 Jan 2024 21:38:37 +0100 Subject: [PATCH 10/22] Fix ci/cd --- .../components/dummy_component/Dockerfile | 25 ++++++------------- .../dummy_component/fondant_component.yaml | 3 +-- .../dummy_component/requirements.txt | 2 +- .../components/dummy_component/src/main.py | 2 +- .../sample_pipeline_test/sample_pipeline.py | 3 +-- scripts/run_sample_pipeline.sh | 2 +- 6 files changed, 12 insertions(+), 25 deletions(-) diff --git a/examples/sample_pipeline_test/components/dummy_component/Dockerfile b/examples/sample_pipeline_test/components/dummy_component/Dockerfile index c39ada80e..c93aa29b8 100644 --- a/examples/sample_pipeline_test/components/dummy_component/Dockerfile +++ b/examples/sample_pipeline_test/components/dummy_component/Dockerfile @@ -1,24 +1,13 @@ -FROM --platform=linux/amd64 python:3.8-slim as base +FROM --platform=linux/amd64 python:3.8-slim -# System dependencies -RUN apt-get update && \ - apt-get upgrade -y && \ - apt-get install git -y - -# Install requirements -COPY requirements.txt / +# install requirements +COPY requirements.txt ./ RUN pip3 install --no-cache-dir -r requirements.txt -# Install Fondant -# This is split from other requirements to leverage caching -ARG FONDANT_VERSION=main -RUN pip3 install fondant[component,aws,azure,gcp]@git+https://github.com/ml6team/fondant@${FONDANT_VERSION} - # Set the working directory to the component folder -WORKDIR /component -COPY src/ src/ -ENV PYTHONPATH "${PYTHONPATH}:./src" - -FROM base WORKDIR /component/src + +# Copy over src-files and spec of the component +COPY src/ . + ENTRYPOINT ["fondant", "execute", "main"] \ No newline at end of file diff --git a/examples/sample_pipeline_test/components/dummy_component/fondant_component.yaml b/examples/sample_pipeline_test/components/dummy_component/fondant_component.yaml index 0a041fa3d..ada48083e 100644 --- a/examples/sample_pipeline_test/components/dummy_component/fondant_component.yaml +++ b/examples/sample_pipeline_test/components/dummy_component/fondant_component.yaml @@ -1,7 +1,6 @@ name: Dummy component description: Dummy component for testing custom components - -image: fndnt/dummy_component:dev +image: dummy_component consumes: text_data: diff --git a/examples/sample_pipeline_test/components/dummy_component/requirements.txt b/examples/sample_pipeline_test/components/dummy_component/requirements.txt index 54b4390d1..7bf47e289 100644 --- a/examples/sample_pipeline_test/components/dummy_component/requirements.txt +++ b/examples/sample_pipeline_test/components/dummy_component/requirements.txt @@ -1 +1 @@ -langchain==0.0.329 \ No newline at end of file +fondant[component] \ No newline at end of file diff --git a/examples/sample_pipeline_test/components/dummy_component/src/main.py b/examples/sample_pipeline_test/components/dummy_component/src/main.py index bf0ddedcd..87fab4bbf 100644 --- a/examples/sample_pipeline_test/components/dummy_component/src/main.py +++ b/examples/sample_pipeline_test/components/dummy_component/src/main.py @@ -16,7 +16,7 @@ class DummyComponent(PandasTransformComponent): """Dummy component that returns the dataframe as it is.""" - def __init__(self, *_): + def __init__(self, *_, **kwargs): pass def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: diff --git a/examples/sample_pipeline_test/sample_pipeline.py b/examples/sample_pipeline_test/sample_pipeline.py index 17141b96d..aca78a804 100644 --- a/examples/sample_pipeline_test/sample_pipeline.py +++ b/examples/sample_pipeline_test/sample_pipeline.py @@ -1,8 +1,6 @@ # This file contains a sample pipeline. Loading data from a parquet file, # using the load_from_parquet component, chain a custom dummy component, and use # the reusable chunking component -import glob -import logging import os import pyarrow as pa from pathlib import Path @@ -10,6 +8,7 @@ os.environ["DOCKER_DEFAULT_PLATFORM"] = "linux/amd64" BASE_PATH = Path("./examples/sample_pipeline_test") +Path(BASE_PATH / ".artifacts").mkdir(parents=True, exist_ok=True) NUMBER_OF_COMPONENTS = 3 # Define pipeline diff --git a/scripts/run_sample_pipeline.sh b/scripts/run_sample_pipeline.sh index a83fd8072..464bd2f88 100755 --- a/scripts/run_sample_pipeline.sh +++ b/scripts/run_sample_pipeline.sh @@ -22,7 +22,7 @@ poetry run fondant run local examples/sample_pipeline_test/sample_pipeline.py \ # Expect that .artifacts was created and isn't empty if [ -d "./examples/sample_pipeline_test/.artifacts" ]; then - if [ "$(ls -A .artifacts)" ]; then + if [ "$(ls -A ./examples/sample_pipeline_test/.artifacts)" ]; then echo "Sample pipeline executed successfully." exit 0 fi From d89c3aaf114a26d8e3cb7bb80bd6a225102a6f2d Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Thu, 4 Jan 2024 09:05:25 +0100 Subject: [PATCH 11/22] Fix ci/cd --- .github/workflows/pipeline.yaml | 3 +-- examples/sample_pipeline_test/sample_pipeline.py | 2 -- scripts/run_sample_pipeline.sh | 2 ++ 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/.github/workflows/pipeline.yaml b/.github/workflows/pipeline.yaml index 4cc756fc7..34eb45332 100644 --- a/.github/workflows/pipeline.yaml +++ b/.github/workflows/pipeline.yaml @@ -46,8 +46,7 @@ jobs: - name: Install dependencies run: | pip install --upgrade pip - pip install poetry==1.4.0 - poetry install --all-extras --with test + pip install fondant - name: Execute sample pipeline run: ./scripts/run_sample_pipeline.sh diff --git a/examples/sample_pipeline_test/sample_pipeline.py b/examples/sample_pipeline_test/sample_pipeline.py index aca78a804..964c29de1 100644 --- a/examples/sample_pipeline_test/sample_pipeline.py +++ b/examples/sample_pipeline_test/sample_pipeline.py @@ -8,8 +8,6 @@ os.environ["DOCKER_DEFAULT_PLATFORM"] = "linux/amd64" BASE_PATH = Path("./examples/sample_pipeline_test") -Path(BASE_PATH / ".artifacts").mkdir(parents=True, exist_ok=True) -NUMBER_OF_COMPONENTS = 3 # Define pipeline pipeline = Pipeline(name="dummy-pipeline", base_path=str(BASE_PATH / ".artifacts")) diff --git a/scripts/run_sample_pipeline.sh b/scripts/run_sample_pipeline.sh index 464bd2f88..d752d5c5b 100755 --- a/scripts/run_sample_pipeline.sh +++ b/scripts/run_sample_pipeline.sh @@ -3,6 +3,8 @@ # cleans up the directory again set -e +mkdir -p ./examples/sample_pipeline_test/.artifacts + cleanup() { # Create a temporary directory artifact_directory="./examples/sample_pipeline_test/.artifacts" From 459400fa1c4f63d5570c0df5cc6f64006be6e059 Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Thu, 4 Jan 2024 09:08:04 +0100 Subject: [PATCH 12/22] Fix ci/cd --- .github/workflows/pipeline.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/pipeline.yaml b/.github/workflows/pipeline.yaml index 34eb45332..29f11072b 100644 --- a/.github/workflows/pipeline.yaml +++ b/.github/workflows/pipeline.yaml @@ -46,6 +46,7 @@ jobs: - name: Install dependencies run: | pip install --upgrade pip + pip install poetry==1.4.0 pip install fondant - name: Execute sample pipeline From f4ab0f00a3b06e1a43416ff2e50de29d00f0cd2f Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Thu, 4 Jan 2024 09:08:51 +0100 Subject: [PATCH 13/22] Fix ci/cd --- .github/workflows/pipeline.yaml | 1 - scripts/run_sample_pipeline.sh | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/pipeline.yaml b/.github/workflows/pipeline.yaml index 29f11072b..34eb45332 100644 --- a/.github/workflows/pipeline.yaml +++ b/.github/workflows/pipeline.yaml @@ -46,7 +46,6 @@ jobs: - name: Install dependencies run: | pip install --upgrade pip - pip install poetry==1.4.0 pip install fondant - name: Execute sample pipeline diff --git a/scripts/run_sample_pipeline.sh b/scripts/run_sample_pipeline.sh index d752d5c5b..09369b1dd 100755 --- a/scripts/run_sample_pipeline.sh +++ b/scripts/run_sample_pipeline.sh @@ -19,7 +19,7 @@ trap cleanup EXIT resolved_path=$(readlink -f "examples/sample_pipeline_test/data") -poetry run fondant run local examples/sample_pipeline_test/sample_pipeline.py \ +fondant run local examples/sample_pipeline_test/sample_pipeline.py \ --extra-volumes $resolved_path:/data # Expect that .artifacts was created and isn't empty From a2c1add8ae76e32636017542b6b16644b9ca845d Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Thu, 4 Jan 2024 09:11:20 +0100 Subject: [PATCH 14/22] Fix ci/cd --- scripts/run_sample_pipeline.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/run_sample_pipeline.sh b/scripts/run_sample_pipeline.sh index 09369b1dd..a24e18094 100755 --- a/scripts/run_sample_pipeline.sh +++ b/scripts/run_sample_pipeline.sh @@ -10,7 +10,7 @@ cleanup() { artifact_directory="./examples/sample_pipeline_test/.artifacts" # Check if the temporary directory exists before attempting to remove it if [ -d "$artifact_directory" ]; then - rm -r "$artifact_directory" + sudo rm -r "$artifact_directory" echo "Temporary directory removed" fi } From 178c2ea9d326ed6120adcdca93f6912b03b25b64 Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Thu, 4 Jan 2024 11:16:01 +0100 Subject: [PATCH 15/22] Addressing comments --- scripts/run_sample_pipeline.sh | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/scripts/run_sample_pipeline.sh b/scripts/run_sample_pipeline.sh index a24e18094..5a1a86285 100755 --- a/scripts/run_sample_pipeline.sh +++ b/scripts/run_sample_pipeline.sh @@ -6,6 +6,8 @@ set -e mkdir -p ./examples/sample_pipeline_test/.artifacts cleanup() { + rv=$? + # Create a temporary directory artifact_directory="./examples/sample_pipeline_test/.artifacts" # Check if the temporary directory exists before attempting to remove it @@ -13,6 +15,8 @@ cleanup() { sudo rm -r "$artifact_directory" echo "Temporary directory removed" fi + + exit $rv } trap cleanup EXIT @@ -22,13 +26,11 @@ resolved_path=$(readlink -f "examples/sample_pipeline_test/data") fondant run local examples/sample_pipeline_test/sample_pipeline.py \ --extra-volumes $resolved_path:/data -# Expect that .artifacts was created and isn't empty -if [ -d "./examples/sample_pipeline_test/.artifacts" ]; then - if [ "$(ls -A ./examples/sample_pipeline_test/.artifacts)" ]; then - echo "Sample pipeline executed successfully." - exit 0 - fi +if [ "$(ls -A ./examples/sample_pipeline_test/.artifacts)" ]; then + echo "Sample pipeline executed successfully." + exit 0 fi + echo "Sample pipeline execution failed." exit 1 From 3f6dfff6c366bba61488e6a33d1cef4c22a51046 Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Mon, 8 Jan 2024 10:00:40 +0100 Subject: [PATCH 16/22] Update pipeline with poetry --- .github/workflows/pipeline.yaml | 3 ++- scripts/run_sample_pipeline.sh | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.github/workflows/pipeline.yaml b/.github/workflows/pipeline.yaml index 34eb45332..4cc756fc7 100644 --- a/.github/workflows/pipeline.yaml +++ b/.github/workflows/pipeline.yaml @@ -46,7 +46,8 @@ jobs: - name: Install dependencies run: | pip install --upgrade pip - pip install fondant + pip install poetry==1.4.0 + poetry install --all-extras --with test - name: Execute sample pipeline run: ./scripts/run_sample_pipeline.sh diff --git a/scripts/run_sample_pipeline.sh b/scripts/run_sample_pipeline.sh index 5a1a86285..3a9103eb3 100755 --- a/scripts/run_sample_pipeline.sh +++ b/scripts/run_sample_pipeline.sh @@ -23,7 +23,7 @@ trap cleanup EXIT resolved_path=$(readlink -f "examples/sample_pipeline_test/data") -fondant run local examples/sample_pipeline_test/sample_pipeline.py \ +poetry run fondant run local examples/sample_pipeline_test/sample_pipeline.py \ --extra-volumes $resolved_path:/data if [ "$(ls -A ./examples/sample_pipeline_test/.artifacts)" ]; then From 165980701bb39d5b23df1c6f664ae843f13b3475 Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Wed, 10 Jan 2024 14:45:51 +0100 Subject: [PATCH 17/22] Addressing comments --- .github/workflows/pipeline.yaml | 2 +- examples/sample_pipeline/README.md | 13 +++++++ .../components/dummy_component/Dockerfile | 0 .../components/dummy_component/README.md | 0 .../dummy_component/fondant_component.yaml | 0 .../dummy_component/requirements.txt | 0 .../components/dummy_component/src/main.py | 1 + .../data/sample.parquet | Bin .../pipeline.py} | 9 ++--- examples/sample_pipeline/run.sh | 29 ++++++++++++++ scripts/run_integration_tests.sh | 34 +++++++++++++++++ scripts/run_sample_pipeline.sh | 36 ------------------ src/fondant/pipeline/runner.py | 2 +- tests/pipeline/test_runner.py | 4 +- tests/test_cli.py | 6 +-- 15 files changed, 88 insertions(+), 48 deletions(-) create mode 100644 examples/sample_pipeline/README.md rename examples/{sample_pipeline_test => sample_pipeline}/components/dummy_component/Dockerfile (100%) rename examples/{sample_pipeline_test => sample_pipeline}/components/dummy_component/README.md (100%) rename examples/{sample_pipeline_test => sample_pipeline}/components/dummy_component/fondant_component.yaml (100%) rename examples/{sample_pipeline_test => sample_pipeline}/components/dummy_component/requirements.txt (100%) rename examples/{sample_pipeline_test => sample_pipeline}/components/dummy_component/src/main.py (95%) rename examples/{sample_pipeline_test => sample_pipeline}/data/sample.parquet (100%) rename examples/{sample_pipeline_test/sample_pipeline.py => sample_pipeline/pipeline.py} (81%) create mode 100644 examples/sample_pipeline/run.sh create mode 100755 scripts/run_integration_tests.sh delete mode 100755 scripts/run_sample_pipeline.sh diff --git a/.github/workflows/pipeline.yaml b/.github/workflows/pipeline.yaml index 4cc756fc7..b3beaa0dd 100644 --- a/.github/workflows/pipeline.yaml +++ b/.github/workflows/pipeline.yaml @@ -50,7 +50,7 @@ jobs: poetry install --all-extras --with test - name: Execute sample pipeline - run: ./scripts/run_sample_pipeline.sh + run: ./scripts/run_integration_tests.sh finish-coveralls: needs: test diff --git a/examples/sample_pipeline/README.md b/examples/sample_pipeline/README.md new file mode 100644 index 000000000..6ab76c9eb --- /dev/null +++ b/examples/sample_pipeline/README.md @@ -0,0 +1,13 @@ +# Sample pipeline + +This example is a simple sample pipeline which uses two reusable components +(load_from_parquet, chunk_text), and a custom dummy component. The custom dummy component only +returns the received dataframe. + +The pipeline can be executed with the Fondant cli: + +```bash +fondant run local pipeline.py +``` + +The automated integration test will use the `run.sh` script. \ No newline at end of file diff --git a/examples/sample_pipeline_test/components/dummy_component/Dockerfile b/examples/sample_pipeline/components/dummy_component/Dockerfile similarity index 100% rename from examples/sample_pipeline_test/components/dummy_component/Dockerfile rename to examples/sample_pipeline/components/dummy_component/Dockerfile diff --git a/examples/sample_pipeline_test/components/dummy_component/README.md b/examples/sample_pipeline/components/dummy_component/README.md similarity index 100% rename from examples/sample_pipeline_test/components/dummy_component/README.md rename to examples/sample_pipeline/components/dummy_component/README.md diff --git a/examples/sample_pipeline_test/components/dummy_component/fondant_component.yaml b/examples/sample_pipeline/components/dummy_component/fondant_component.yaml similarity index 100% rename from examples/sample_pipeline_test/components/dummy_component/fondant_component.yaml rename to examples/sample_pipeline/components/dummy_component/fondant_component.yaml diff --git a/examples/sample_pipeline_test/components/dummy_component/requirements.txt b/examples/sample_pipeline/components/dummy_component/requirements.txt similarity index 100% rename from examples/sample_pipeline_test/components/dummy_component/requirements.txt rename to examples/sample_pipeline/components/dummy_component/requirements.txt diff --git a/examples/sample_pipeline_test/components/dummy_component/src/main.py b/examples/sample_pipeline/components/dummy_component/src/main.py similarity index 95% rename from examples/sample_pipeline_test/components/dummy_component/src/main.py rename to examples/sample_pipeline/components/dummy_component/src/main.py index 87fab4bbf..f17dad2eb 100644 --- a/examples/sample_pipeline_test/components/dummy_component/src/main.py +++ b/examples/sample_pipeline/components/dummy_component/src/main.py @@ -21,4 +21,5 @@ def __init__(self, *_, **kwargs): def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: """Dummy component that returns the dataframe as it is.""" + # raise RuntimeError return dataframe diff --git a/examples/sample_pipeline_test/data/sample.parquet b/examples/sample_pipeline/data/sample.parquet similarity index 100% rename from examples/sample_pipeline_test/data/sample.parquet rename to examples/sample_pipeline/data/sample.parquet diff --git a/examples/sample_pipeline_test/sample_pipeline.py b/examples/sample_pipeline/pipeline.py similarity index 81% rename from examples/sample_pipeline_test/sample_pipeline.py rename to examples/sample_pipeline/pipeline.py index 964c29de1..b0acd9682 100644 --- a/examples/sample_pipeline_test/sample_pipeline.py +++ b/examples/sample_pipeline/pipeline.py @@ -1,16 +1,15 @@ # This file contains a sample pipeline. Loading data from a parquet file, # using the load_from_parquet component, chain a custom dummy component, and use # the reusable chunking component -import os import pyarrow as pa from pathlib import Path from fondant.pipeline import Pipeline -os.environ["DOCKER_DEFAULT_PLATFORM"] = "linux/amd64" -BASE_PATH = Path("./examples/sample_pipeline_test") +BASE_PATH = Path("./.artifacts").resolve() +BASE_PATH.mkdir(parents=True, exist_ok=True) # Define pipeline -pipeline = Pipeline(name="dummy-pipeline", base_path=str(BASE_PATH / ".artifacts")) +pipeline = Pipeline(name="dummy-pipeline", base_path=str(BASE_PATH)) # Load from hub component load_component_column_mapping = { @@ -28,7 +27,7 @@ ) dataset = dataset.apply( - name_or_path=Path(BASE_PATH / "components" / "dummy_component"), + name_or_path="./components/dummy_component", ) dataset.apply( diff --git a/examples/sample_pipeline/run.sh b/examples/sample_pipeline/run.sh new file mode 100644 index 000000000..e6d43b811 --- /dev/null +++ b/examples/sample_pipeline/run.sh @@ -0,0 +1,29 @@ +#!/bin/bash +# This script executes the sample pipeline in the example folder, checks the correct execution and +# cleans up the directory again +set -e + + +# Setup teardown +cleanup() { + rv=$? + + # Try to remove .artifact folder + artifact_directory="./.artifacts" + + if [ -d "$artifact_directory" ]; then + # Directory exists, remove it + rm -rf "$artifact_directory" + fi + + exit $rv +} + +trap cleanup EXIT + +# Bind local data directory to pipeline +data_dir=$(readlink -f "data") + +# Run pipeline +fondant run local pipeline.py \ + --extra-volumes $data_dir:/data diff --git a/scripts/run_integration_tests.sh b/scripts/run_integration_tests.sh new file mode 100755 index 000000000..655843e39 --- /dev/null +++ b/scripts/run_integration_tests.sh @@ -0,0 +1,34 @@ +#!/bin/bash +# This script executes the sample pipeline in the example folder, checks the correct execution and +# cleans up the directory again +echo "Start integration tests execution ..." + +failed_tests=() + +# Find all run.sh scripts and execute them +for test_script in ./examples/*/run.sh; do + test_name=$(basename "$(dirname "$test_script")") + + echo "Running test: $test_name" + + # Set working dir to the currect integration test + cd $(dirname "$test_script") + + # Execute the run.sh script + bash ./run.sh + + # Check the exit status + if [ $? -ne 0 ]; then + echo "Test $test_name failed!" + failed_tests+=("$test_name") + fi +done + +echo "Tests completed" + +if [ ${#failed_tests[@]} -eq 0 ]; then + echo "All tests passed!" +else + echo "Failed tests: ${failed_tests[@]}" + exit 1 # Indicate failure to cicd +fi diff --git a/scripts/run_sample_pipeline.sh b/scripts/run_sample_pipeline.sh deleted file mode 100755 index 3a9103eb3..000000000 --- a/scripts/run_sample_pipeline.sh +++ /dev/null @@ -1,36 +0,0 @@ -#!/bin/bash -# This script executes the sample pipeline in the example folder, checks the correct execution and -# cleans up the directory again -set -e - -mkdir -p ./examples/sample_pipeline_test/.artifacts - -cleanup() { - rv=$? - - # Create a temporary directory - artifact_directory="./examples/sample_pipeline_test/.artifacts" - # Check if the temporary directory exists before attempting to remove it - if [ -d "$artifact_directory" ]; then - sudo rm -r "$artifact_directory" - echo "Temporary directory removed" - fi - - exit $rv -} - -trap cleanup EXIT - -resolved_path=$(readlink -f "examples/sample_pipeline_test/data") - -poetry run fondant run local examples/sample_pipeline_test/sample_pipeline.py \ - --extra-volumes $resolved_path:/data - -if [ "$(ls -A ./examples/sample_pipeline_test/.artifacts)" ]; then - echo "Sample pipeline executed successfully." - exit 0 -fi - - -echo "Sample pipeline execution failed." -exit 1 diff --git a/src/fondant/pipeline/runner.py b/src/fondant/pipeline/runner.py index 7483ac4ed..899ab73e0 100644 --- a/src/fondant/pipeline/runner.py +++ b/src/fondant/pipeline/runner.py @@ -41,7 +41,7 @@ def _run(self, input_spec: str, *args, **kwargs): ] print("Starting pipeline run...") - subprocess.call(cmd) # nosec + subprocess.check_call(cmd) # nosec print("Finished pipeline run.") def run( diff --git a/tests/pipeline/test_runner.py b/tests/pipeline/test_runner.py index 0a66b7be0..db77319ff 100644 --- a/tests/pipeline/test_runner.py +++ b/tests/pipeline/test_runner.py @@ -23,7 +23,7 @@ def test_docker_runner(): """Test that the docker runner while mocking subprocess.call.""" - with mock.patch("subprocess.call") as mock_call: + with mock.patch("subprocess.check_call") as mock_call: DockerRunner().run("some/path") mock_call.assert_called_once_with( [ @@ -41,7 +41,7 @@ def test_docker_runner(): def test_docker_runner_from_pipeline(): - with mock.patch("subprocess.call") as mock_call: + with mock.patch("subprocess.check_call") as mock_call: DockerRunner().run(PIPELINE) mock_call.assert_called_once_with( [ diff --git a/tests/test_cli.py b/tests/test_cli.py index 3b9c56072..20151e241 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -274,7 +274,7 @@ def test_local_run(): extra_volumes=[], build_arg=[], ) - with patch("subprocess.call") as mock_call: + with patch("subprocess.check_call") as mock_call: run_local(args) mock_call.assert_called_once_with( [ @@ -290,7 +290,7 @@ def test_local_run(): ], ) - with patch("subprocess.call") as mock_call: + with patch("subprocess.check_call") as mock_call: args1 = argparse.Namespace( local=True, vertex=False, @@ -330,7 +330,7 @@ def test_local_run_cloud_credentials(): with patch( "fondant.pipeline.compiler.DockerCompiler.compile", ) as mock_compiler, patch( - "subprocess.call", + "subprocess.check_call", ) as mock_runner: args = argparse.Namespace( local=True, From d0f7c5ef3c09dec3caef8b1dc15538950bd87ff6 Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Wed, 10 Jan 2024 15:14:24 +0100 Subject: [PATCH 18/22] Use poetry run --- examples/sample_pipeline/run.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/sample_pipeline/run.sh b/examples/sample_pipeline/run.sh index e6d43b811..e10312ace 100644 --- a/examples/sample_pipeline/run.sh +++ b/examples/sample_pipeline/run.sh @@ -25,5 +25,5 @@ trap cleanup EXIT data_dir=$(readlink -f "data") # Run pipeline -fondant run local pipeline.py \ +poetry run fondant run local pipeline.py \ --extra-volumes $data_dir:/data From 8f81448f29900137f7c80bdcc181574c41e832b0 Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Thu, 11 Jan 2024 10:51:10 +0100 Subject: [PATCH 19/22] Use git hash for building custom components --- .github/workflows/pipeline.yaml | 2 +- .../components/dummy_component/Dockerfile | 11 ++++++++++- .../components/dummy_component/requirements.txt | 1 - examples/sample_pipeline/run.sh | 3 ++- scripts/run_integration_tests.sh | 4 +++- 5 files changed, 16 insertions(+), 5 deletions(-) diff --git a/.github/workflows/pipeline.yaml b/.github/workflows/pipeline.yaml index b3beaa0dd..f3014fccb 100644 --- a/.github/workflows/pipeline.yaml +++ b/.github/workflows/pipeline.yaml @@ -50,7 +50,7 @@ jobs: poetry install --all-extras --with test - name: Execute sample pipeline - run: ./scripts/run_integration_tests.sh + run: ./scripts/run_integration_tests.sh $GITHUB_SHA finish-coveralls: needs: test diff --git a/examples/sample_pipeline/components/dummy_component/Dockerfile b/examples/sample_pipeline/components/dummy_component/Dockerfile index c93aa29b8..e15bbb588 100644 --- a/examples/sample_pipeline/components/dummy_component/Dockerfile +++ b/examples/sample_pipeline/components/dummy_component/Dockerfile @@ -1,9 +1,18 @@ FROM --platform=linux/amd64 python:3.8-slim -# install requirements +# System dependencies +RUN apt-get update && \ + apt-get upgrade -y && \ + apt-get install git -y + +# Install requirements COPY requirements.txt ./ RUN pip3 install --no-cache-dir -r requirements.txt +# Install fondant +ARG FONDANT_VERSION=main +RUN pip3 install fondant[component,aws,azure,gcp]@git+https://github.com/ml6team/fondant@${FONDANT_VERSION} + # Set the working directory to the component folder WORKDIR /component/src diff --git a/examples/sample_pipeline/components/dummy_component/requirements.txt b/examples/sample_pipeline/components/dummy_component/requirements.txt index 7bf47e289..e69de29bb 100644 --- a/examples/sample_pipeline/components/dummy_component/requirements.txt +++ b/examples/sample_pipeline/components/dummy_component/requirements.txt @@ -1 +0,0 @@ -fondant[component] \ No newline at end of file diff --git a/examples/sample_pipeline/run.sh b/examples/sample_pipeline/run.sh index e10312ace..1620d4ffd 100644 --- a/examples/sample_pipeline/run.sh +++ b/examples/sample_pipeline/run.sh @@ -2,6 +2,7 @@ # This script executes the sample pipeline in the example folder, checks the correct execution and # cleans up the directory again set -e +GIT_HASH=$1 # Setup teardown @@ -26,4 +27,4 @@ data_dir=$(readlink -f "data") # Run pipeline poetry run fondant run local pipeline.py \ - --extra-volumes $data_dir:/data + --extra-volumes $data_dir:/data --build-arg FONDANT_VERSION=$GIT_HASH diff --git a/scripts/run_integration_tests.sh b/scripts/run_integration_tests.sh index 655843e39..0352050e9 100755 --- a/scripts/run_integration_tests.sh +++ b/scripts/run_integration_tests.sh @@ -1,6 +1,8 @@ #!/bin/bash # This script executes the sample pipeline in the example folder, checks the correct execution and # cleans up the directory again +GIT_HASH=$1 + echo "Start integration tests execution ..." failed_tests=() @@ -15,7 +17,7 @@ for test_script in ./examples/*/run.sh; do cd $(dirname "$test_script") # Execute the run.sh script - bash ./run.sh + bash ./run.sh $GIT_HASH # Check the exit status if [ $? -ne 0 ]; then From b42bbdd1ada6d4af8998346147fb63cd768e8129 Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Thu, 11 Jan 2024 13:31:05 +0100 Subject: [PATCH 20/22] Update chunk text component --- components/chunk_text/fondant_component.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/chunk_text/fondant_component.yaml b/components/chunk_text/fondant_component.yaml index bb1d0088e..b673ef5ff 100644 --- a/components/chunk_text/fondant_component.yaml +++ b/components/chunk_text/fondant_component.yaml @@ -35,7 +35,7 @@ args: 'SentenceTransformersTokenTextSplitter', 'LatexTextSplitter', 'SpacyTextSplitter', 'TokenTextSplitter', 'NLTKTextSplitter', 'PythonCodeTextSplitter', 'character', 'NLTK', 'SpaCy'] - type: int + type: str default: RecursiveCharacterTextSplitter chunk_kwargs: description: The arguments to pass to the chunking strategy From 555a7a8c3f2fdafaefa83048a7283dd17c40e3f0 Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Thu, 11 Jan 2024 15:17:56 +0100 Subject: [PATCH 21/22] Merge main into feature/move-integration-test --- docs/components/component_spec.md | 35 +++++++++++++++++ src/fondant/component/component.py | 3 ++ src/fondant/component/executor.py | 4 +- src/fondant/core/exceptions.py | 4 ++ src/fondant/pipeline/runner.py | 16 +++++--- tests/component/test_component.py | 63 ++++++++++++++++++++++++++++++ tests/examples/__init__.py | 0 tests/pipeline/test_runner.py | 43 +++++++++++++++++--- tests/test_cli.py | 40 +++++++++++++++---- 9 files changed, 188 insertions(+), 20 deletions(-) create mode 100644 tests/examples/__init__.py diff --git a/docs/components/component_spec.md b/docs/components/component_spec.md index 9299305a2..65e121c18 100644 --- a/docs/components/component_spec.md +++ b/docs/components/component_spec.md @@ -301,4 +301,39 @@ class ExampleComponent(PandasTransformComponent): Returns: A pandas dataframe containing the transformed data """ +``` + +Afterwards, we pass all keyword arguments to the `__init__()` method of the component. + + +You can also use the a `teardown()` method to perform any cleanup after the component has been executed. +This is a good place to close any open connections or files. + +```python +import pandas as pd +from fondant.component import PandasTransformComponent +from my_library import Client + + def __init__(self, *, client_url, **kwargs) -> None: + """ + Args: + x_argument: An argument passed to the component + """ + # Initialize your component here based on the arguments + self.client = Client(client_url) + + def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: + """Implement your custom logic in this single method + + Args: + dataframe: A Pandas dataframe containing the data + + Returns: + A pandas dataframe containing the transformed data + """ + + def teardown(self): + """Perform any cleanup after the component has been executed + """ + self.client.shutdown() ``` \ No newline at end of file diff --git a/src/fondant/component/component.py b/src/fondant/component/component.py index 82d539b84..7d9a86113 100644 --- a/src/fondant/component/component.py +++ b/src/fondant/component/component.py @@ -26,6 +26,9 @@ def __init__( ): pass + def teardown(self) -> None: + """Method called after the component has been executed.""" + class DaskLoadComponent(BaseComponent): """Component that loads data and returns a Dask DataFrame.""" diff --git a/src/fondant/component/executor.py b/src/fondant/component/executor.py index 3026eb625..db3140703 100644 --- a/src/fondant/component/executor.py +++ b/src/fondant/component/executor.py @@ -335,7 +335,7 @@ def _run_execution( input_manifest: Manifest, ) -> Manifest: logging.info("Executing component") - component = component_cls( + component: Component = component_cls( consumes=self.operation_spec.inner_consumes, produces=self.operation_spec.inner_produces, **self.user_arguments, @@ -350,6 +350,8 @@ def _run_execution( ) self._write_data(dataframe=output_df, manifest=output_manifest) + component.teardown() + return output_manifest def execute(self, component_cls: t.Type[Component]) -> None: diff --git a/src/fondant/core/exceptions.py b/src/fondant/core/exceptions.py index 4143f389a..5560b9aab 100644 --- a/src/fondant/core/exceptions.py +++ b/src/fondant/core/exceptions.py @@ -25,3 +25,7 @@ class InvalidTypeSchema(ValidationError, FondantException): class UnsupportedTypeAnnotation(FondantException): """Thrown when an unsupported type annotation is encountered during type inference.""" + + +class PipelineRunError(ValidationError, FondantException): + """Thrown when a pipeline run results in an error.""" diff --git a/src/fondant/pipeline/runner.py b/src/fondant/pipeline/runner.py index 0ce3ea568..62b158c1c 100644 --- a/src/fondant/pipeline/runner.py +++ b/src/fondant/pipeline/runner.py @@ -6,6 +6,7 @@ import yaml +from fondant.core.exceptions import PipelineRunError from fondant.pipeline import Pipeline from fondant.pipeline.compiler import ( DockerCompiler, @@ -38,15 +39,23 @@ def _run(self, input_spec: str, *args, **kwargs): "--pull", "always", "--remove-orphans", + "--abort-on-container-exit", ] print("Starting pipeline run...") # copy the current environment with the DOCKER_DEFAULT_PLATFORM argument - subprocess.check_call( # nosec + output = subprocess.run( # nosec cmd, env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"), + capture_output=True, + encoding="utf8", ) + + if output.returncode != 0: + msg = f"Command failed with error: '{output.stderr}'" + raise PipelineRunError(msg) + print("Finished pipeline run.") def run( @@ -55,12 +64,11 @@ def run( *, extra_volumes: t.Union[t.Optional[list], t.Optional[str]] = None, build_args: t.Optional[t.List[str]] = None, - ) -> None: + ): """Run a pipeline, either from a compiled docker-compose spec or from a fondant pipeline. Args: input: the pipeline to compile or a path to a already compiled docker-compose spec - output_path: the path where to save the docker-compose spec extra_volumes: a list of extra volumes (using the Short syntax: https://docs.docker.com/compose/compose-file/05-services/#short-syntax-5) to mount in the docker-compose spec. @@ -258,8 +266,6 @@ def run( pipeline_name: the name of the pipeline to create role_arn: the Amazon Resource Name role to use for the processing steps, if none provided the `sagemaker.get_execution_role()` role will be used. - instance_type: the instance type to use for the processing steps - (see: https://aws.amazon.com/ec2/instance-types/ for options). """ if isinstance(input, Pipeline): os.makedirs(".fondant", exist_ok=True) diff --git a/tests/component/test_component.py b/tests/component/test_component.py index 397ab210e..191e6e329 100644 --- a/tests/component/test_component.py +++ b/tests/component/test_component.py @@ -298,6 +298,69 @@ def load(self): load.mock.assert_called_once() +@pytest.mark.usefixtures("_patched_data_writing") +def test_teardown_method(metadata): + # Mock CLI arguments load + operation_spec = OperationSpec( + ComponentSpec.from_file(components_path / "component.yaml"), + ) + + sys.argv = [ + "", + "--metadata", + metadata.to_json(), + "--flag", + "success", + "--value", + "1", + "--output_manifest_path", + str(components_path / "output_manifest.json"), + "--operation_spec", + operation_spec.to_json(), + "--cache", + "False", + "--produces", + "{}", + ] + + class MockClient: + def __init__(self): + self.is_connected = True + + def shutdown(self): + if self.is_connected: + self.is_connected = False + + client = MockClient() + + class MyLoadComponent(DaskLoadComponent): + def __init__(self, *, flag, value, **kwargs): + self.flag = flag + self.value = value + self.client = client + + def load(self): + data = { + "id": [0, 1], + "captions_data": ["hello world", "this is another caption"], + } + return dd.DataFrame.from_dict(data, npartitions=N_PARTITIONS) + + def teardown(self) -> None: + self.client.shutdown() + + executor_factory = ExecutorFactory(MyLoadComponent) + executor = executor_factory.get_executor() + assert executor.input_partition_rows is None + + teardown = patch_method_class(MyLoadComponent.teardown) + assert client.is_connected is True + with mock.patch.object(MyLoadComponent, "teardown", teardown): + executor.execute(MyLoadComponent) + teardown.mock.assert_called_once() + assert client.is_connected is False + + @pytest.mark.usefixtures("_patched_data_loading", "_patched_data_writing") def test_dask_transform_component(metadata): operation_spec = OperationSpec( diff --git a/tests/examples/__init__.py b/tests/examples/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/pipeline/test_runner.py b/tests/pipeline/test_runner.py index cdec89989..be7c736af 100644 --- a/tests/pipeline/test_runner.py +++ b/tests/pipeline/test_runner.py @@ -5,6 +5,7 @@ from unittest import mock import pytest +from fondant.core.exceptions import PipelineRunError from fondant.pipeline import Pipeline from fondant.pipeline.runner import ( DockerRunner, @@ -22,11 +23,23 @@ ) -def test_docker_runner(): +@pytest.fixture() +def mock_subprocess_run(): + def _mock_subprocess_run(*args, **kwargs): + class MockCompletedProcess: + returncode = 0 + + return MockCompletedProcess() + + return _mock_subprocess_run + + +def test_docker_runner(mock_subprocess_run): """Test that the docker runner while mocking subprocess.call.""" - with mock.patch("subprocess.check_call") as mock_call: + with mock.patch("subprocess.run") as mock_run: + mock_run.side_effect = mock_subprocess_run DockerRunner().run("some/path") - mock_call.assert_called_once_with( + mock_run.assert_called_once_with( [ "docker", "compose", @@ -37,15 +50,19 @@ def test_docker_runner(): "--pull", "always", "--remove-orphans", + "--abort-on-container-exit", ], env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"), + capture_output=True, + encoding="utf8", ) -def test_docker_runner_from_pipeline(): - with mock.patch("subprocess.check_call") as mock_call: +def test_docker_runner_from_pipeline(mock_subprocess_run): + with mock.patch("subprocess.run") as mock_run: + mock_run.side_effect = mock_subprocess_run DockerRunner().run(PIPELINE) - mock_call.assert_called_once_with( + mock_run.assert_called_once_with( [ "docker", "compose", @@ -56,11 +73,25 @@ def test_docker_runner_from_pipeline(): "--pull", "always", "--remove-orphans", + "--abort-on-container-exit", ], env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"), + capture_output=True, + encoding="utf8", ) +def test_invalid_docker_run(): + """Test that the docker runner throws the correct error.""" + spec_path = "some/path" + resolved_spec_path = str(Path(spec_path).resolve()) + with pytest.raises( + PipelineRunError, + match=f"stat {resolved_spec_path}: no such file or directory", + ): + DockerRunner().run(spec_path) + + class MockKfpClient: def __init__(self, host): self.host = host diff --git a/tests/test_cli.py b/tests/test_cli.py index fc73915cc..23a4e2d0c 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -44,6 +44,17 @@ def load(self): pass +@pytest.fixture() +def mock_subprocess_run(): + def _mock_subprocess_run(*args, **kwargs): + class MockCompletedProcess: + returncode = 0 + + return MockCompletedProcess() + + return _mock_subprocess_run + + @pytest.mark.parametrize("command", commands) def test_basic_invocation(command): """Test that the CLI (sub)commands can be invoked without errors.""" @@ -262,7 +273,7 @@ def test_sagemaker_compile(tmp_path_factory): ) -def test_local_run(): +def test_local_run(mock_subprocess_run): """Test that the run command works with different arguments.""" args = argparse.Namespace( local=True, @@ -275,9 +286,11 @@ def test_local_run(): extra_volumes=[], build_arg=[], ) - with patch("subprocess.check_call") as mock_call: + + with patch("subprocess.run") as mock_run: + mock_run.side_effect = mock_subprocess_run run_local(args) - mock_call.assert_called_once_with( + mock_run.assert_called_once_with( [ "docker", "compose", @@ -288,11 +301,15 @@ def test_local_run(): "--pull", "always", "--remove-orphans", + "--abort-on-container-exit", ], env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"), + capture_output=True, + encoding="utf8", ) - with patch("subprocess.check_call") as mock_call: + with patch("subprocess.run") as mock_run: + mock_run.side_effect = mock_subprocess_run args1 = argparse.Namespace( local=True, vertex=False, @@ -306,7 +323,7 @@ def test_local_run(): credentials=None, ) run_local(args1) - mock_call.assert_called_once_with( + mock_run.assert_called_once_with( [ "docker", "compose", @@ -317,12 +334,15 @@ def test_local_run(): "--pull", "always", "--remove-orphans", + "--abort-on-container-exit", ], env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"), + capture_output=True, + encoding="utf8", ) -def test_local_run_cloud_credentials(): +def test_local_run_cloud_credentials(mock_subprocess_run): namespace_creds_kwargs = [ {"auth_gcp": True, "auth_azure": False, "auth_aws": False}, {"auth_gcp": False, "auth_azure": True, "auth_aws": False}, @@ -333,8 +353,10 @@ def test_local_run_cloud_credentials(): with patch( "fondant.pipeline.compiler.DockerCompiler.compile", ) as mock_compiler, patch( - "subprocess.check_call", + "subprocess.run", ) as mock_runner: + mock_runner.side_effect = mock_subprocess_run + args = argparse.Namespace( local=True, vertex=False, @@ -360,7 +382,6 @@ def test_local_run_cloud_credentials(): output_path=".fondant/compose.yaml", build_args=[], ) - mock_runner.assert_called_once_with( [ "docker", @@ -372,8 +393,11 @@ def test_local_run_cloud_credentials(): "--pull", "always", "--remove-orphans", + "--abort-on-container-exit", ], env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"), + capture_output=True, + encoding="utf8", ) From e0154e117d52f3b635235ce6a1f9d550d9cf3186 Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Thu, 11 Jan 2024 15:27:09 +0100 Subject: [PATCH 22/22] Update run script --- examples/sample_pipeline/run.sh | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/examples/sample_pipeline/run.sh b/examples/sample_pipeline/run.sh index 1620d4ffd..10a53c731 100644 --- a/examples/sample_pipeline/run.sh +++ b/examples/sample_pipeline/run.sh @@ -14,7 +14,9 @@ cleanup() { if [ -d "$artifact_directory" ]; then # Directory exists, remove it - rm -rf "$artifact_directory" + # Can't delete files in cicd pipeline due to missing permissions. Not necessarily needed there, + # but might be useful if you executing the script locally. + rm -rf "$artifact_directory" 2>/dev/null || true fi exit $rv